From 16f43234f608ad4238a926012ffb683a5c1b154a Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Tue, 10 Dec 2024 22:42:48 -0800 Subject: [PATCH] [parquet-sdk][token_v2] migrate token ownership table [pt2] --- .../raw_v2_token_ownerships.rs | 39 +- .../token_v2_models/v2_token_ownerships.rs | 623 ++---------------- .../parquet_token_v2_processor.rs | 49 +- .../src/processors/token_v2_processor.rs | 69 +- .../src/config/processor_config.rs | 3 + .../src/parquet_processors/mod.rs | 26 + .../parquet_token_v2_processor.rs | 14 +- .../parquet_token_v2_extractor.rs | 93 +-- .../token_v2_processor/token_v2_extractor.rs | 4 +- 9 files changed, 274 insertions(+), 646 deletions(-) diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs index 135be50c2..545a77ad7 100644 --- a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs @@ -257,7 +257,7 @@ impl RawTokenOwnershipV2 { prior_nft_ownership: &AHashMap, tokens_burned: &TokenV2Burned, object_metadatas: &ObjectAggregatedDataMapping, - conn: &mut DbPoolConnection<'_>, + conn: &mut Option>, query_retries: u32, query_retry_delay_ms: u64, ) -> anyhow::Result> { @@ -344,7 +344,7 @@ impl RawTokenOwnershipV2 { txn_timestamp: chrono::NaiveDateTime, prior_nft_ownership: &AHashMap, tokens_burned: &TokenV2Burned, - conn: &mut DbPoolConnection<'_>, + conn: &mut Option>, query_retries: u32, query_retry_delay_ms: u64, ) -> anyhow::Result> { @@ -370,7 +370,7 @@ impl RawTokenOwnershipV2 { txn_timestamp: chrono::NaiveDateTime, prior_nft_ownership: &AHashMap, tokens_burned: &TokenV2Burned, - conn: &mut DbPoolConnection<'_>, + conn: &mut Option>, query_retries: u32, query_retry_delay_ms: u64, ) -> anyhow::Result> { @@ -387,16 +387,9 @@ impl RawTokenOwnershipV2 { match prior_nft_ownership.get(&token_address) { Some(inner) => inner.owner_address.clone(), None => { - match CurrentTokenOwnershipV2Query::get_latest_owned_nft_by_token_data_id( - conn, - &token_address, - query_retries, - query_retry_delay_ms, - ) - .await - { - Ok(nft) => nft.owner_address.clone(), - Err(_) => { + match conn { + None => { + // TODO: update message tracing::error!( transaction_version = txn_version, lookup_key = &token_address, @@ -404,6 +397,26 @@ impl RawTokenOwnershipV2 { ); DEFAULT_OWNER_ADDRESS.to_string() }, + Some(ref mut conn) => { + match CurrentTokenOwnershipV2Query::get_latest_owned_nft_by_token_data_id( + conn, + &token_address, + query_retries, + query_retry_delay_ms, + ) + .await + { + Ok(nft) => nft.owner_address.clone(), + Err(_) => { + tracing::error!( + transaction_version = txn_version, + lookup_key = &token_address, + "Failed to find current_token_ownership_v2 for burned token. You probably should backfill db." + ); + DEFAULT_OWNER_ADDRESS.to_string() + }, + } + } } }, } diff --git a/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs index 22bc77c68..26410ac76 100644 --- a/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs +++ b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs @@ -7,35 +7,17 @@ use crate::{ bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, - db::{ - common::models::token_v2_models::{ - raw_v2_token_datas::RawTokenDataV2, - raw_v2_token_ownerships::NFTOwnershipV2, - v2_token_utils::{TokenStandard, TokenV2Burned, DEFAULT_OWNER_ADDRESS}, - }, - parquet::models::fungible_asset_models::parquet_v2_fungible_asset_balances::DEFAULT_AMOUNT_VALUE, - postgres::models::{ - object_models::v2_object_utils::{ObjectAggregatedDataMapping, ObjectWithMetadata}, - resources::FromWriteResource, - token_models::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}, - token_v2_models::v2_token_ownerships::CurrentTokenOwnershipV2, - }, + db::common::models::token_v2_models::raw_v2_token_ownerships::{ + CurrentTokenOwnershipV2Convertible, RawCurrentTokenOwnershipV2, RawTokenOwnershipV2, + TokenOwnershipV2Convertible, }, - utils::util::{ensure_not_negative, standardize_address}, }; -use ahash::AHashMap; use allocative_derive::Allocative; -use anyhow::Context; -use aptos_protos::transaction::v1::{ - DeleteResource, DeleteTableItem, WriteResource, WriteTableItem, -}; -use bigdecimal::{BigDecimal, ToPrimitive, Zero}; +use bigdecimal::ToPrimitive; use field_count::FieldCount; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; -const LEGACY_DEFAULT_PROPERTY_VERSION: u64 = 0; - #[derive( Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, )] @@ -72,544 +54,83 @@ impl GetTimeStamp for TokenOwnershipV2 { } } -impl TokenOwnershipV2 { - /// For nfts it's the same resources that we parse tokendatas from so we leverage the work done in there to get ownership data - /// Vecs are returned because there could be multiple transfers and we need to document each one here. - pub fn get_nft_v2_from_token_data( - token_data: &RawTokenDataV2, - object_metadatas: &ObjectAggregatedDataMapping, - ) -> anyhow::Result> { - let mut ownerships = vec![]; - - let object_data = object_metadatas - .get(&token_data.token_data_id) - .context("If token data exists objectcore must exist")?; - let object_core = object_data.object.object_core.clone(); - let token_data_id = token_data.token_data_id.clone(); - let owner_address = object_core.get_owner_address(); - let storage_id = token_data_id.clone(); - - // is_soulbound currently means if an object is completely untransferrable - // OR if only admin can transfer. Only the former is true soulbound but - // people might already be using it with the latter meaning so let's include both. - let is_soulbound = if object_data.untransferable.as_ref().is_some() { - true - } else { - !object_core.allow_ungated_transfer - }; - let non_transferrable_by_owner = !object_core.allow_ungated_transfer; - - ownerships.push(Self { - txn_version: token_data.transaction_version, - write_set_change_index: token_data.write_set_change_index, - token_data_id: token_data_id.clone(), - property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, - owner_address: Some(owner_address.clone()), - storage_id: storage_id.clone(), - amount: DEFAULT_AMOUNT_VALUE.clone(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: Some(is_soulbound), - token_standard: TokenStandard::V2.to_string(), - block_timestamp: token_data.transaction_timestamp, - non_transferrable_by_owner: Some(non_transferrable_by_owner), - }); - - // check if token was transferred - for (event_index, transfer_event) in &object_data.transfer_events { - // If it's a self transfer then skip - if transfer_event.get_to_address() == transfer_event.get_from_address() { - continue; - } - ownerships.push(Self { - txn_version: token_data.transaction_version, - // set to negative of event index to avoid collison with write set index - write_set_change_index: -1 * event_index, - token_data_id: token_data_id.clone(), - property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, - // previous owner - owner_address: Some(transfer_event.get_from_address()), - storage_id: storage_id.clone(), - // soft delete - amount: DEFAULT_AMOUNT_VALUE.clone(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: Some(is_soulbound), - token_standard: TokenStandard::V2.to_string(), - block_timestamp: token_data.transaction_timestamp, - non_transferrable_by_owner: Some(is_soulbound), - }); - } - Ok(ownerships) - } - - async fn get_burned_nft_v2_helper( - token_address: &str, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - prior_nft_ownership: &AHashMap, - tokens_burned: &TokenV2Burned, - ) -> anyhow::Result> { - let token_address = standardize_address(token_address); - if let Some(burn_event) = tokens_burned.get(&token_address) { - // 1. Try to lookup token address in burn event mapping - let previous_owner = - if let Some(previous_owner) = burn_event.get_previous_owner_address() { - previous_owner - } else { - // 2. If it doesn't exist in burn event mapping, then it must be an old burn event that doesn't contain previous_owner. - // Do a lookup to get previous owner. This is necessary because previous owner is part of current token ownerships primary key. - match prior_nft_ownership.get(&token_address) { - Some(inner) => inner.owner_address.clone(), - None => DEFAULT_OWNER_ADDRESS.to_string(), - } - }; - - let token_data_id = token_address.clone(); - let storage_id = token_data_id.clone(); - - return Ok(Some(( - Self { - txn_version, - write_set_change_index, - token_data_id: token_data_id.clone(), - property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, - owner_address: Some(previous_owner.clone()), - storage_id: storage_id.clone(), - amount: DEFAULT_AMOUNT_VALUE.clone(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: None, // default - token_standard: TokenStandard::V2.to_string(), - block_timestamp: txn_timestamp, - non_transferrable_by_owner: None, // default - }, - CurrentTokenOwnershipV2 { - token_data_id, - property_version_v1: BigDecimal::zero(), - owner_address: previous_owner, - storage_id, - amount: BigDecimal::zero(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: None, // default - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: None, // default - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: None, // default - }, - ))); +impl TokenOwnershipV2Convertible for TokenOwnershipV2 { + fn from_raw(raw_item: RawTokenOwnershipV2) -> Self { + Self { + txn_version: raw_item.transaction_version, + write_set_change_index: raw_item.write_set_change_index, + token_data_id: raw_item.token_data_id, + property_version_v1: raw_item.property_version_v1.to_u64().unwrap(), + owner_address: raw_item.owner_address, + storage_id: raw_item.storage_id, + amount: raw_item.amount.to_string(), + table_type_v1: raw_item.table_type_v1, + token_properties_mutated_v1: raw_item + .token_properties_mutated_v1 + .map(|v| v.to_string()), + is_soulbound_v2: raw_item.is_soulbound_v2, + token_standard: raw_item.token_standard, + block_timestamp: raw_item.transaction_timestamp, + non_transferrable_by_owner: raw_item.non_transferrable_by_owner, } - Ok(None) } +} - /// We want to track tokens in any offer/claims and tokenstore - pub fn get_v1_from_delete_table_item( - table_item: &DeleteTableItem, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - table_handle_to_owner: &TableHandleToOwner, - ) -> anyhow::Result)>> { - let table_item_data = table_item.data.as_ref().unwrap(); - - let maybe_token_id = match TokenWriteSet::from_table_item_type( - table_item_data.key_type.as_str(), - &table_item_data.key, - txn_version, - )? { - Some(TokenWriteSet::TokenId(inner)) => Some(inner), - _ => None, - }; - - if let Some(token_id_struct) = maybe_token_id { - let table_handle = standardize_address(&table_item.handle.to_string()); - let token_data_id_struct = token_id_struct.token_data_id; - let token_data_id = token_data_id_struct.to_id(); - - let maybe_table_metadata = table_handle_to_owner.get(&table_handle); - let (curr_token_ownership, owner_address, table_type) = match maybe_table_metadata { - Some(tm) => { - if tm.table_type != "0x3::token::TokenStore" { - return Ok(None); - } - let owner_address = tm.get_owner_address(); - ( - Some(CurrentTokenOwnershipV2 { - token_data_id: token_data_id.clone(), - property_version_v1: token_id_struct.property_version.clone(), - owner_address: owner_address.clone(), - storage_id: table_handle.clone(), - amount: BigDecimal::zero(), - table_type_v1: Some(tm.table_type.clone()), - token_properties_mutated_v1: None, - is_soulbound_v2: None, - token_standard: TokenStandard::V1.to_string(), - is_fungible_v2: None, - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: None, - }), - Some(owner_address), - Some(tm.table_type.clone()), - ) - }, - None => (None, None, None), - }; - - Ok(Some(( - Self { - txn_version, - write_set_change_index, - token_data_id, - property_version_v1: token_id_struct.property_version.to_u64().unwrap(), - owner_address, - storage_id: table_handle, - amount: DEFAULT_AMOUNT_VALUE.clone(), - table_type_v1: table_type, - token_properties_mutated_v1: None, - is_soulbound_v2: None, - token_standard: TokenStandard::V1.to_string(), - block_timestamp: txn_timestamp, - non_transferrable_by_owner: None, - }, - curr_token_ownership, - ))) - } else { - Ok(None) - } - } - - /// We want to track tokens in any offer/claims and tokenstore - pub fn get_v1_from_write_table_item( - table_item: &WriteTableItem, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - table_handle_to_owner: &TableHandleToOwner, - ) -> anyhow::Result)>> { - let table_item_data = table_item.data.as_ref().unwrap(); - - let maybe_token = match TokenWriteSet::from_table_item_type( - table_item_data.value_type.as_str(), - &table_item_data.value, - txn_version, - )? { - Some(TokenWriteSet::Token(inner)) => Some(inner), - _ => None, - }; - - if let Some(token) = maybe_token { - let table_handle = standardize_address(&table_item.handle.to_string()); - let amount = ensure_not_negative(token.amount); - let token_id_struct = token.id; - let token_data_id_struct = token_id_struct.token_data_id; - let token_data_id = token_data_id_struct.to_id(); +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct CurrentTokenOwnershipV2 { + pub token_data_id: String, + pub property_version_v1: u64, // BigDecimal, + pub owner_address: String, + pub storage_id: String, + pub amount: String, // BigDecimal, + pub table_type_v1: Option, + pub token_properties_mutated_v1: Option, // Option, + pub is_soulbound_v2: Option, + pub token_standard: String, + pub is_fungible_v2: Option, + pub last_transaction_version: i64, + #[allocative(skip)] + pub last_transaction_timestamp: chrono::NaiveDateTime, + pub non_transferrable_by_owner: Option, +} - let maybe_table_metadata = table_handle_to_owner.get(&table_handle); - let (curr_token_ownership, owner_address, table_type) = match maybe_table_metadata { - Some(tm) => { - if tm.table_type != "0x3::token::TokenStore" { - return Ok(None); - } - let owner_address = tm.get_owner_address(); - ( - Some(CurrentTokenOwnershipV2 { - token_data_id: token_data_id.clone(), - property_version_v1: token_id_struct.property_version.clone(), - owner_address: owner_address.clone(), - storage_id: table_handle.clone(), - amount: amount.clone(), - table_type_v1: Some(tm.table_type.clone()), - token_properties_mutated_v1: Some(token.token_properties.clone()), - is_soulbound_v2: None, - token_standard: TokenStandard::V1.to_string(), - is_fungible_v2: None, - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: None, - }), - Some(owner_address), - Some(tm.table_type.clone()), - ) - }, - None => (None, None, None), - }; +impl NamedTable for CurrentTokenOwnershipV2 { + const TABLE_NAME: &'static str = "current_token_ownerships_v2"; +} - Ok(Some(( - Self { - txn_version, - write_set_change_index, - token_data_id, - property_version_v1: token_id_struct.property_version.to_u64().unwrap(), - owner_address, - storage_id: table_handle, - amount: amount.to_string(), - table_type_v1: table_type, - token_properties_mutated_v1: Some( - canonical_json::to_string(&token.token_properties).unwrap(), - ), - is_soulbound_v2: None, - token_standard: TokenStandard::V1.to_string(), - block_timestamp: txn_timestamp, - non_transferrable_by_owner: None, - }, - curr_token_ownership, - ))) - } else { - Ok(None) - } +impl HasVersion for CurrentTokenOwnershipV2 { + fn version(&self) -> i64 { + self.last_transaction_version } +} - pub async fn get_burned_nft_v2_from_write_resource( - write_resource: &WriteResource, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - prior_nft_ownership: &AHashMap, - tokens_burned: &TokenV2Burned, - object_metadatas: &ObjectAggregatedDataMapping, - ) -> anyhow::Result> { - let token_data_id = standardize_address(&write_resource.address.to_string()); - if tokens_burned - .get(&standardize_address(&token_data_id)) - .is_some() - { - if let Some(object) = &ObjectWithMetadata::from_write_resource(write_resource)? { - let object_core = &object.object_core; - let owner_address = object_core.get_owner_address(); - let storage_id = token_data_id.clone(); - - // is_soulbound currently means if an object is completely untransferrable - // OR if only admin can transfer. Only the former is true soulbound but - // people might already be using it with the latter meaning so let's include both. - let is_soulbound = if object_metadatas - .get(&token_data_id) - .map(|obj| obj.untransferable.as_ref()) - .is_some() - { - true - } else { - !object_core.allow_ungated_transfer - }; - let non_transferrable_by_owner = !object_core.allow_ungated_transfer; - - return Ok(Some(( - Self { - txn_version, - write_set_change_index, - token_data_id: token_data_id.clone(), - property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, - owner_address: Some(owner_address.clone()), - storage_id: storage_id.clone(), - amount: DEFAULT_AMOUNT_VALUE.clone(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: Some(is_soulbound), - token_standard: TokenStandard::V2.to_string(), - block_timestamp: txn_timestamp, - non_transferrable_by_owner: Some(non_transferrable_by_owner), - }, - CurrentTokenOwnershipV2 { - token_data_id, - property_version_v1: BigDecimal::zero(), - owner_address, - storage_id, - amount: BigDecimal::zero(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: Some(is_soulbound), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: Some(false), - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: Some(non_transferrable_by_owner), - }, - ))); - } else { - return Self::get_burned_nft_v2_helper( - &token_data_id, - txn_version, - write_set_change_index, - txn_timestamp, - prior_nft_ownership, - tokens_burned, - ) - .await; - } - } - Ok(None) +impl GetTimeStamp for CurrentTokenOwnershipV2 { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.last_transaction_timestamp } +} - pub fn get_burned_nft_v2_from_delete_resource( - delete_resource: &DeleteResource, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - prior_nft_ownership: &AHashMap, - tokens_burned: &TokenV2Burned, - ) -> anyhow::Result> { - let token_address = standardize_address(&delete_resource.address.to_string()); - let token_address = standardize_address(&token_address); - if let Some(burn_event) = tokens_burned.get(&token_address) { - // 1. Try to lookup token address in burn event mapping - let previous_owner = - if let Some(previous_owner) = burn_event.get_previous_owner_address() { - previous_owner - } else { - // 2. If it doesn't exist in burn event mapping, then it must be an old burn event that doesn't contain previous_owner. - // Do a lookup to get previous owner. This is necessary because previous owner is part of current token ownerships primary key. - match prior_nft_ownership.get(&token_address) { - Some(inner) => inner.owner_address.clone(), - None => { - DEFAULT_OWNER_ADDRESS.to_string() // we don't want to query db to get the previous owner for parquet. - }, - } - }; - - let token_data_id = token_address.clone(); - let storage_id = token_data_id.clone(); - - return Ok(Some(( - Self { - txn_version, - write_set_change_index, - token_data_id: token_data_id.clone(), - property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, - owner_address: Some(previous_owner.clone()), - storage_id: storage_id.clone(), - amount: DEFAULT_AMOUNT_VALUE.clone(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: None, // default - token_standard: TokenStandard::V2.to_string(), - block_timestamp: txn_timestamp, - non_transferrable_by_owner: None, // default - }, - CurrentTokenOwnershipV2 { - token_data_id, - property_version_v1: BigDecimal::zero(), - owner_address: previous_owner, - storage_id, - amount: BigDecimal::zero(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: None, // default - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: None, // default - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: None, // default - }, - ))); +// Facilitate tracking when a token is burned +impl CurrentTokenOwnershipV2Convertible for CurrentTokenOwnershipV2 { + fn from_raw(raw_item: RawCurrentTokenOwnershipV2) -> Self { + Self { + token_data_id: raw_item.token_data_id, + property_version_v1: raw_item.property_version_v1.to_u64().unwrap(), + owner_address: raw_item.owner_address, + storage_id: raw_item.storage_id, + amount: raw_item.amount.to_string(), + table_type_v1: raw_item.table_type_v1, + token_properties_mutated_v1: raw_item + .token_properties_mutated_v1 + .map(|v| v.to_string()), + is_soulbound_v2: raw_item.is_soulbound_v2, + token_standard: raw_item.token_standard, + is_fungible_v2: raw_item.is_fungible_v2, + last_transaction_version: raw_item.last_transaction_version, + last_transaction_timestamp: raw_item.last_transaction_timestamp, + non_transferrable_by_owner: raw_item.non_transferrable_by_owner, } - Ok(None) } } - -// // Copyright © Aptos Foundation -// // SPDX-License-Identifier: Apache-2.0 -// -// // This is required because a diesel macro makes clippy sad -// #![allow(clippy::extra_unused_lifetimes)] -// #![allow(clippy::unused_unit)] -// -// use crate::{ -// bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, -// }; -// use allocative_derive::Allocative; -// use anyhow::Context; -// use field_count::FieldCount; -// use parquet_derive::ParquetRecordWriter; -// use serde::{Deserialize, Serialize}; -// use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::TokenOwnershipV2Convertible; -// use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::RawTokenOwnershipV2; -// use bigdecimal::{BigDecimal, ToPrimitive}; -// -// -// const LEGACY_DEFAULT_PROPERTY_VERSION: u64 = 0; -// -// #[derive( -// Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, -// )] -// pub struct TokenOwnershipV2 { -// pub txn_version: i64, -// pub write_set_change_index: i64, -// pub token_data_id: String, -// pub property_version_v1: u64, -// pub owner_address: Option, -// pub storage_id: String, -// pub amount: String, // this is a string representation of a bigdecimal -// pub table_type_v1: Option, -// pub token_properties_mutated_v1: Option, -// pub is_soulbound_v2: Option, -// pub token_standard: String, -// #[allocative(skip)] -// pub block_timestamp: chrono::NaiveDateTime, -// pub non_transferrable_by_owner: Option, -// } -// -// impl NamedTable for TokenOwnershipV2 { -// const TABLE_NAME: &'static str = "token_ownerships_v2"; -// } -// -// impl HasVersion for TokenOwnershipV2 { -// fn version(&self) -> i64 { -// self.txn_version -// } -// } -// -// impl GetTimeStamp for TokenOwnershipV2 { -// fn get_timestamp(&self) -> chrono::NaiveDateTime { -// self.block_timestamp -// } -// } -// -// impl TokenOwnershipV2Convertible for TokenOwnershipV2 { -// fn from_raw(raw_item: RawTokenOwnershipV2) -> Self { -// Self { -// txn_version: raw_item.transaction_version, -// write_set_change_index: raw_item.write_set_change_index, -// token_data_id: raw_item.token_data_id, -// property_version_v1: raw_item.property_version_v1.to_u64().unwrap(), -// owner_address: raw_item.owner_address, -// storage_id: raw_item.storage_id, -// amount: raw_item.amount.to_string(), -// table_type_v1: raw_item.table_type_v1, -// token_properties_mutated_v1: raw_item.token_properties_mutated_v1.map(|v| v.to_string()), -// is_soulbound_v2: raw_item.is_soulbound_v2, -// token_standard: raw_item.token_standard, -// block_timestamp: raw_item.transaction_timestamp, -// non_transferrable_by_owner: raw_item.non_transferrable_by_owner, -// } -// } -// } -// -// #[derive( -// Clone, Debug, Deserialize, Eq, PartialEq, Serialize, -// )] -// pub struct CurrentTokenOwnershipV2 { -// pub token_data_id: String, -// pub property_version_v1: BigDecimal, -// pub owner_address: String, -// pub storage_id: String, -// pub amount: BigDecimal, -// pub table_type_v1: Option, -// pub token_properties_mutated_v1: Option, -// pub is_soulbound_v2: Option, -// pub token_standard: String, -// pub is_fungible_v2: Option, -// pub last_transaction_version: i64, -// pub last_transaction_timestamp: chrono::NaiveDateTime, -// pub non_transferrable_by_owner: Option, -// } -// -// // Facilitate tracking when a token is burned -// #[derive(Clone, Debug)] -// pub struct NFTOwnershipV2 { -// pub token_data_id: String, -// pub owner_address: String, -// pub is_soulbound: Option, -// } diff --git a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs index 58ef21b15..3a76cf3d3 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs @@ -10,7 +10,9 @@ use crate::{ db::{ common::models::token_v2_models::{ raw_v2_token_datas::{RawTokenDataV2, TokenDataV2Convertible}, - raw_v2_token_ownerships::NFTOwnershipV2, + raw_v2_token_ownerships::{ + NFTOwnershipV2, RawTokenOwnershipV2, TokenOwnershipV2Convertible, + }, v2_token_utils::{ Burn, BurnEvent, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, }, @@ -31,7 +33,7 @@ use crate::{ processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait}, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::ArcDbPool, + database::{ArcDbPool, DbPoolConnection}, util::{parse_timestamp, standardize_address}, }, }; @@ -131,9 +133,12 @@ impl ProcessorTrait for ParquetTokenV2Processor { let table_handle_to_owner = TableMetadataForToken::get_table_handle_to_owner_from_transactions(&transactions); - let (raw_token_datas_v2, token_ownerships_v2) = parse_v2_token( + let (raw_token_datas_v2, raw_token_ownerships_v2) = parse_v2_token( &transactions, &table_handle_to_owner, + &mut None, + 0, + 0, &mut transaction_version_to_struct_count, ) .await; @@ -152,8 +157,13 @@ impl ProcessorTrait for ParquetTokenV2Processor { .await .context("Failed to send token data v2 parquet data")?; + let parquet_token_ownerships_v2: Vec = raw_token_ownerships_v2 + .into_iter() + .map(TokenOwnershipV2::from_raw) + .collect(); + let token_ownerships_v2_parquet_data = ParquetDataGeneric { - data: token_ownerships_v2, + data: parquet_token_ownerships_v2, }; self.v2_token_ownerships_sender @@ -181,8 +191,11 @@ impl ProcessorTrait for ParquetTokenV2Processor { async fn parse_v2_token( transactions: &[Transaction], table_handle_to_owner: &TableHandleToOwner, + conn: &mut Option>, + query_retries: u32, + query_retry_delay_ms: u64, transaction_version_to_struct_count: &mut AHashMap, -) -> (Vec, Vec) { +) -> (Vec, Vec) { // Token V2 and V1 combined let mut token_datas_v2 = vec![]; let mut token_ownerships_v2 = vec![]; @@ -338,7 +351,7 @@ async fn parse_v2_token( .or_insert(1); } if let Some((token_ownership, current_token_ownership)) = - TokenOwnershipV2::get_v1_from_write_table_item( + RawTokenOwnershipV2::get_v1_from_write_table_item( table_item, txn_version, wsc_index, @@ -366,7 +379,7 @@ async fn parse_v2_token( }, Change::DeleteTableItem(table_item) => { if let Some((token_ownership, current_token_ownership)) = - TokenOwnershipV2::get_v1_from_delete_table_item( + RawTokenOwnershipV2::get_v1_from_delete_table_item( table_item, txn_version, wsc_index, @@ -404,11 +417,12 @@ async fn parse_v2_token( .unwrap() { // Add NFT ownership - let mut ownerships = TokenOwnershipV2::get_nft_v2_from_token_data( - &raw_token_data, - &token_v2_metadata_helper, - ) - .unwrap(); + let (mut ownerships, _) = + RawTokenOwnershipV2::get_nft_v2_from_token_data( + &raw_token_data, + &token_v2_metadata_helper, + ) + .unwrap(); if let Some(current_nft_ownership) = ownerships.first() { // Note that the first element in ownerships is the current ownership. We need to cache // it in prior_nft_ownership so that moving forward if we see a burn we'll know @@ -435,7 +449,7 @@ async fn parse_v2_token( } // Add burned NFT handling if let Some((nft_ownership, current_nft_ownership)) = - TokenOwnershipV2::get_burned_nft_v2_from_write_resource( + RawTokenOwnershipV2::get_burned_nft_v2_from_write_resource( resource, txn_version, wsc_index, @@ -443,6 +457,9 @@ async fn parse_v2_token( &prior_nft_ownership, &tokens_burned, &token_v2_metadata_helper, + conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap() @@ -464,14 +481,18 @@ async fn parse_v2_token( }, Change::DeleteResource(resource) => { if let Some((nft_ownership, current_nft_ownership)) = - TokenOwnershipV2::get_burned_nft_v2_from_delete_resource( + RawTokenOwnershipV2::get_burned_nft_v2_from_delete_resource( resource, txn_version, wsc_index, txn_timestamp, &prior_nft_ownership, &tokens_burned, + conn, + query_retries, + query_retry_delay_ms, ) + .await .unwrap() { token_ownerships_v2.push(nft_ownership); diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index d54f87771..7286c708e 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -606,7 +606,7 @@ impl ProcessorTrait for TokenV2Processor { let processing_start = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let mut conn = self.get_conn().await; + let conn = self.get_conn().await; // First get all token related table metadata from the batch of transactions. This is in case // an earlier transaction has metadata (in resources) that's missing from a later transaction. @@ -632,7 +632,7 @@ impl ProcessorTrait for TokenV2Processor { ) = parse_v2_token( &transactions, &table_handle_to_owner, - &mut conn, + &mut Some(conn), query_retries, query_retry_delay_ms, ) @@ -769,10 +769,30 @@ impl ProcessorTrait for TokenV2Processor { } } +pub async fn parse_v2_token_for_parquet( + transactions: &[Transaction], + table_handle_to_owner: &TableHandleToOwner, +) -> ( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, // deleted token ownerships + Vec, + Vec, + Vec, + Vec, +) { + parse_v2_token(transactions, table_handle_to_owner, &mut None, 0, 0).await +} + pub async fn parse_v2_token( transactions: &[Transaction], table_handle_to_owner: &TableHandleToOwner, - conn: &mut DbPoolConnection<'_>, + conn: &mut Option>, query_retries: u32, query_retry_delay_ms: u64, ) -> ( @@ -996,26 +1016,31 @@ pub async fn parse_v2_token( let wsc_index = index as i64; match wsc.change.as_ref().unwrap() { Change::WriteTableItem(table_item) => { - if let Some((collection, current_collection)) = - CollectionV2::get_v1_from_write_table_item( - table_item, - txn_version, - wsc_index, - txn_timestamp, - table_handle_to_owner, - conn, - query_retries, - query_retry_delay_ms, - ) - .await - .unwrap() - { - collections_v2.push(collection); - current_collections_v2.insert( - current_collection.collection_id.clone(), - current_collection, - ); + // TODO: revisit when we migrate collection_v2 for parquet + // for not it will be only handled for postgres + if let Some(ref mut conn) = conn { + if let Some((collection, current_collection)) = + CollectionV2::get_v1_from_write_table_item( + table_item, + txn_version, + wsc_index, + txn_timestamp, + table_handle_to_owner, + conn, + query_retries, + query_retry_delay_ms, + ) + .await + .unwrap() + { + collections_v2.push(collection); + current_collections_v2.insert( + current_collection.collection_id.clone(), + current_collection, + ); + } } + if let Some((token_data, current_token_data)) = RawTokenDataV2::get_v1_from_write_table_item( table_item, diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index af90a8837..d0812a1f0 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -33,6 +33,7 @@ use processor::{ v2_token_activities::TokenActivityV2, v2_token_datas::{CurrentTokenDataV2, TokenDataV2}, v2_token_metadata::CurrentTokenV2Metadata, + v2_token_ownerships::{CurrentTokenOwnershipV2, TokenOwnershipV2}, }, transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, user_transaction_models::parquet_user_transactions::UserTransaction, @@ -179,6 +180,8 @@ impl ProcessorConfig { TokenActivityV2::TABLE_NAME.to_string(), TokenDataV2::TABLE_NAME.to_string(), CurrentTokenDataV2::TABLE_NAME.to_string(), + TokenOwnershipV2::TABLE_NAME.to_string(), + CurrentTokenOwnershipV2::TABLE_NAME.to_string(), ]), _ => HashSet::new(), // Default case for unsupported processors } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index ae04286ea..73143db74 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -39,6 +39,7 @@ use processor::{ v2_token_activities::TokenActivityV2, v2_token_datas::{CurrentTokenDataV2, TokenDataV2}, v2_token_metadata::CurrentTokenV2Metadata, + v2_token_ownerships::{CurrentTokenOwnershipV2, TokenOwnershipV2}, }, transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, user_transaction_models::parquet_user_transactions::UserTransaction, @@ -120,6 +121,8 @@ pub enum ParquetTypeEnum { TokenActivitiesV2, TokenDatasV2, CurrentTokenDatasV2, + TokenOwnershipsV2, + CurrentTokenOwnershipsV2, } /// Trait for handling various Parquet types. @@ -213,6 +216,11 @@ impl_parquet_trait!( impl_parquet_trait!(TokenActivityV2, ParquetTypeEnum::TokenActivitiesV2); impl_parquet_trait!(TokenDataV2, ParquetTypeEnum::TokenDatasV2); impl_parquet_trait!(CurrentTokenDataV2, ParquetTypeEnum::CurrentTokenDatasV2); +impl_parquet_trait!(TokenOwnershipV2, ParquetTypeEnum::TokenOwnershipsV2); +impl_parquet_trait!( + CurrentTokenOwnershipV2, + ParquetTypeEnum::CurrentTokenOwnershipsV2 +); #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] @@ -241,6 +249,8 @@ pub enum ParquetTypeStructs { TokenActivityV2(Vec), TokenDataV2(Vec), CurrentTokenDataV2(Vec), + TokenOwnershipV2(Vec), + CurrentTokenOwnershipV2(Vec), } impl ParquetTypeStructs { @@ -292,6 +302,10 @@ impl ParquetTypeStructs { ParquetTypeEnum::CurrentTokenDatasV2 => { ParquetTypeStructs::CurrentTokenDataV2(Vec::new()) }, + ParquetTypeEnum::TokenOwnershipsV2 => ParquetTypeStructs::TokenOwnershipV2(Vec::new()), + ParquetTypeEnum::CurrentTokenOwnershipsV2 => { + ParquetTypeStructs::CurrentTokenOwnershipV2(Vec::new()) + }, } } @@ -446,6 +460,18 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + ( + ParquetTypeStructs::TokenOwnershipV2(self_data), + ParquetTypeStructs::TokenOwnershipV2(other_data), + ) => { + handle_append!(self_data, other_data) + }, + ( + ParquetTypeStructs::CurrentTokenOwnershipV2(self_data), + ParquetTypeStructs::CurrentTokenOwnershipV2(other_data), + ) => { + handle_append!(self_data, other_data) + }, _ => Err(ProcessorError::ProcessError { message: "Mismatched buffer types in append operation".to_string(), }), diff --git a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs index 02bab9ad4..ad948dfc3 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs @@ -40,6 +40,7 @@ use processor::{ }; use std::{collections::HashMap, sync::Arc}; use tracing::{debug, info}; +use processor::db::parquet::models::token_v2_models::v2_token_ownerships::{CurrentTokenOwnershipV2, TokenOwnershipV2}; pub struct ParquetTokenV2Processor { pub config: IndexerProcessorConfig, @@ -120,8 +121,9 @@ impl ProcessorTrait for ParquetTokenV2Processor { let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table); // TODO: Update this - let parquet_token_v2_extractor = - ParquetTokenV2Extractor::new(backfill_table, 5, 500, self.db_pool.clone()); + let parquet_token_v2_extractor = ParquetTokenV2Extractor { + opt_in_tables: backfill_table, + }; let gcs_client = initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await; @@ -150,9 +152,13 @@ impl ProcessorTrait for ParquetTokenV2Processor { CurrentTokenDataV2::schema(), ), ( - ParquetTypeEnum::CurrentTokenDatasV2, - CurrentTokenDataV2::schema(), + ParquetTypeEnum::TokenOwnershipsV2, + TokenOwnershipV2::schema(), ), + ( + ParquetTypeEnum::CurrentTokenOwnershipsV2, + CurrentTokenOwnershipV2::schema(), + ) ] .into_iter() .collect(); diff --git a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs index be8bb4730..59a0125e2 100644 --- a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs @@ -1,6 +1,6 @@ use crate::{ parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, - utils::{database::ArcDbPool, parquet_extractor_helper::add_to_map_if_opted_in_for_backfill}, + utils::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, }; use aptos_indexer_processor_sdk::{ aptos_protos::transaction::v1::Transaction, @@ -17,6 +17,9 @@ use processor::{ raw_v2_token_activities::TokenActivityV2Convertible, raw_v2_token_datas::{CurrentTokenDataV2Convertible, TokenDataV2Convertible}, raw_v2_token_metadata::CurrentTokenV2MetadataConvertible, + raw_v2_token_ownerships::{ + CurrentTokenOwnershipV2Convertible, TokenOwnershipV2Convertible, + }, }, parquet::models::token_v2_models::{ token_claims::CurrentTokenPendingClaim, @@ -24,6 +27,7 @@ use processor::{ v2_token_activities::TokenActivityV2, v2_token_datas::{CurrentTokenDataV2, TokenDataV2}, v2_token_metadata::CurrentTokenV2Metadata, + v2_token_ownerships::{CurrentTokenOwnershipV2, TokenOwnershipV2}, }, postgres::models::token_models::tokens::TableMetadataForToken, }, @@ -39,27 +43,8 @@ where Self: Processable + Send + Sized + 'static, { pub opt_in_tables: TableFlags, - // TODO: Revisit and remove - query_retries: u32, - query_retry_delay_ms: u64, - conn_pool: ArcDbPool, } -impl ParquetTokenV2Extractor { - pub fn new( - opt_in_tables: TableFlags, - query_retries: u32, - query_retry_delay_ms: u64, - conn_pool: ArcDbPool, - ) -> Self { - Self { - opt_in_tables, - query_retries, - query_retry_delay_ms, - conn_pool, - } - } -} type ParquetTypeMap = HashMap; #[async_trait] @@ -72,15 +57,6 @@ impl Processable for ParquetTokenV2Extractor { &mut self, transactions: TransactionContext, ) -> anyhow::Result>, ProcessorError> { - let mut conn = self - .conn_pool - .get() - .await - .map_err(|e| ProcessorError::DBStoreError { - message: format!("Failed to get connection from pool: {:?}", e), - query: None, - })?; - // First get all token related table metadata from the batch of transactions. This is in case // an earlier transaction has metadata (in resources) that's missing from a later transaction. let table_handle_to_owner: ahash::AHashMap = @@ -89,24 +65,17 @@ impl Processable for ParquetTokenV2Extractor { let ( _collections_v2, raw_token_datas_v2, - _token_ownerships_v2, + raw_token_ownerships_v2, _current_collections_v2, raw_current_token_datas_v2, raw_current_deleted_token_datas_v2, - _current_token_ownerships_v2, - _current_deleted_token_ownerships_v2, + raw_current_token_ownerships_v2, + raw_current_deleted_token_ownerships_v2, raw_token_activities_v2, raw_current_token_v2_metadata, raw_current_token_royalties_v1, raw_current_token_claims, - ) = parse_v2_token( - &transactions.data, - &table_handle_to_owner, - &mut conn, - self.query_retries, - self.query_retry_delay_ms, - ) - .await; + ) = parse_v2_token(&transactions.data, &table_handle_to_owner, &mut None, 0, 0).await; let parquet_current_token_claims: Vec = raw_current_token_claims .into_iter() @@ -146,6 +115,23 @@ impl Processable for ParquetTokenV2Extractor { .map(CurrentTokenDataV2::from_raw) .collect(); + let parquet_token_ownerships_v2: Vec = raw_token_ownerships_v2 + .into_iter() + .map(TokenOwnershipV2::from_raw) + .collect(); + + let parquet_current_token_ownerships_v2: Vec = + raw_current_token_ownerships_v2 + .into_iter() + .map(CurrentTokenOwnershipV2::from_raw) + .collect(); + + let parquet_deleted_current_token_ownerships_v2: Vec = + raw_current_deleted_token_ownerships_v2 + .into_iter() + .map(CurrentTokenOwnershipV2::from_raw) + .collect(); + // Print the size of each extracted data type debug!("Processed data sizes:"); debug!( @@ -170,6 +156,15 @@ impl Processable for ParquetTokenV2Extractor { " - CurrentDeletedTokenDataV2: {}", parquet_deleted_current_token_datss_v2.len() ); + debug!(" - TokenOwnershipV2: {}", parquet_token_ownerships_v2.len()); + debug!( + " - CurrentTokenOwnershipV2: {}", + parquet_current_token_ownerships_v2.len() + ); + debug!( + " - CurrentDeletedTokenOwnershipV2: {}", + parquet_deleted_current_token_ownerships_v2.len() + ); // We are merging these two tables, b/c they are essentially the same table let mut combined_current_token_datas_v2: Vec = Vec::new(); @@ -180,6 +175,14 @@ impl Processable for ParquetTokenV2Extractor { .iter() .for_each(|x| combined_current_token_datas_v2.push(x.clone())); + let mut merged_current_token_ownerships_v2: Vec = Vec::new(); + parquet_current_token_ownerships_v2 + .iter() + .for_each(|x| merged_current_token_ownerships_v2.push(x.clone())); + parquet_deleted_current_token_ownerships_v2 + .iter() + .for_each(|x| merged_current_token_ownerships_v2.push(x.clone())); + let mut map: HashMap = HashMap::new(); // Array of tuples for each data type and its corresponding enum variant and flag @@ -214,6 +217,16 @@ impl Processable for ParquetTokenV2Extractor { ParquetTypeEnum::CurrentTokenDatasV2, ParquetTypeStructs::CurrentTokenDataV2(combined_current_token_datas_v2), ), + ( + TableFlags::TOKEN_OWNERSHIPS_V2, + ParquetTypeEnum::TokenOwnershipsV2, + ParquetTypeStructs::TokenOwnershipV2(parquet_token_ownerships_v2), + ), + ( + TableFlags::CURRENT_TOKEN_OWNERSHIPS_V2, + ParquetTypeEnum::CurrentTokenOwnershipsV2, + ParquetTypeStructs::CurrentTokenOwnershipV2(merged_current_token_ownerships_v2), + ), ]; // Populate the map based on opt-in tables diff --git a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs index 41900dd4f..7b0ed613a 100644 --- a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs @@ -94,7 +94,7 @@ impl Processable for TokenV2Extractor { >, ProcessorError, > { - let mut conn = self + let conn = self .conn_pool .get() .await @@ -124,7 +124,7 @@ impl Processable for TokenV2Extractor { ) = parse_v2_token( &transactions.data, &table_handle_to_owner, - &mut conn, + &mut Some(conn), self.query_retries, self.query_retry_delay_ms, )