diff --git a/rust/processor/src/db/common/models/mod.rs b/rust/processor/src/db/common/models/mod.rs index 45d1687a3..6e0395836 100644 --- a/rust/processor/src/db/common/models/mod.rs +++ b/rust/processor/src/db/common/models/mod.rs @@ -3,3 +3,4 @@ pub mod ans_models; pub mod default_models; pub mod event_models; pub mod fungible_asset_models; +pub mod token_v2_models; diff --git a/rust/processor/src/db/common/models/token_v2_models/mod.rs b/rust/processor/src/db/common/models/token_v2_models/mod.rs new file mode 100644 index 000000000..aabf2fddd --- /dev/null +++ b/rust/processor/src/db/common/models/token_v2_models/mod.rs @@ -0,0 +1 @@ +pub mod raw_token_claims; diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_token_claims.rs b/rust/processor/src/db/common/models/token_v2_models/raw_token_claims.rs new file mode 100644 index 000000000..881440bc5 --- /dev/null +++ b/rust/processor/src/db/common/models/token_v2_models/raw_token_claims.rs @@ -0,0 +1,215 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + db::postgres::models::{ + token_models::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}, + token_v2_models::v2_token_activities::TokenActivityHelperV1, + }, + utils::util::standardize_address, +}; +use ahash::AHashMap; +use aptos_protos::transaction::v1::{DeleteTableItem, WriteTableItem}; +use bigdecimal::{BigDecimal, Zero}; +use serde::{Deserialize, Serialize}; + +// Map to keep track of the metadata of token offers that were claimed. The key is the token data id of the offer. +// Potentially it'd also be useful to keep track of offers that were canceled. +pub type TokenV1Claimed = AHashMap; + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct RawCurrentTokenPendingClaim { + pub token_data_id_hash: String, + pub property_version: BigDecimal, + pub from_address: String, + pub to_address: String, + pub collection_data_id_hash: String, + pub creator_address: String, + pub collection_name: String, + pub name: String, + pub amount: BigDecimal, + pub table_handle: String, + pub last_transaction_version: i64, + pub last_transaction_timestamp: chrono::NaiveDateTime, + pub token_data_id: String, + pub collection_id: String, +} + +impl Ord for RawCurrentTokenPendingClaim { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.token_data_id_hash + .cmp(&other.token_data_id_hash) + .then(self.property_version.cmp(&other.property_version)) + .then(self.from_address.cmp(&other.from_address)) + .then(self.to_address.cmp(&other.to_address)) + } +} + +impl PartialOrd for RawCurrentTokenPendingClaim { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl RawCurrentTokenPendingClaim { + /// Token claim is stored in a table in the offerer's account. The key is token_offer_id (token_id + to address) + /// and value is token (token_id + amount) + pub fn from_write_table_item( + table_item: &WriteTableItem, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + table_handle_to_owner: &TableHandleToOwner, + ) -> anyhow::Result> { + let table_item_data = table_item.data.as_ref().unwrap(); + + let maybe_offer = match TokenWriteSet::from_table_item_type( + table_item_data.key_type.as_str(), + &table_item_data.key, + txn_version, + )? { + Some(TokenWriteSet::TokenOfferId(inner)) => Some(inner), + _ => None, + }; + if let Some(offer) = &maybe_offer { + let maybe_token = match TokenWriteSet::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? { + Some(TokenWriteSet::Token(inner)) => Some(inner), + _ => None, + }; + if let Some(token) = &maybe_token { + let table_handle = standardize_address(&table_item.handle.to_string()); + + let maybe_table_metadata = table_handle_to_owner.get(&table_handle); + + if let Some(table_metadata) = maybe_table_metadata { + let token_id = offer.token_id.clone(); + let token_data_id_struct = token_id.token_data_id; + let collection_data_id_hash = + token_data_id_struct.get_collection_data_id_hash(); + let token_data_id_hash = token_data_id_struct.to_hash(); + // Basically adding 0x prefix to the previous 2 lines. This is to be consistent with Token V2 + let collection_id = token_data_id_struct.get_collection_id(); + let token_data_id = token_data_id_struct.to_id(); + let collection_name = token_data_id_struct.get_collection_trunc(); + let name = token_data_id_struct.get_name_trunc(); + + return Ok(Some(Self { + token_data_id_hash, + property_version: token_id.property_version, + from_address: table_metadata.get_owner_address(), + to_address: offer.get_to_address(), + collection_data_id_hash, + creator_address: token_data_id_struct.get_creator_address(), + collection_name, + name, + amount: token.amount.clone(), + table_handle, + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + token_data_id, + collection_id, + })); + } else { + tracing::warn!( + transaction_version = txn_version, + table_handle = table_handle, + "Missing table handle metadata for TokenClaim. {:?}", + table_handle_to_owner + ); + } + } else { + tracing::warn!( + transaction_version = txn_version, + value_type = table_item_data.value_type, + value = table_item_data.value, + "Expecting token as value for key = token_offer_id", + ); + } + } + Ok(None) + } + + pub fn from_delete_table_item( + table_item: &DeleteTableItem, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + table_handle_to_owner: &TableHandleToOwner, + tokens_claimed: &TokenV1Claimed, + ) -> anyhow::Result> { + let table_item_data = table_item.data.as_ref().unwrap(); + + let maybe_offer = match TokenWriteSet::from_table_item_type( + table_item_data.key_type.as_str(), + &table_item_data.key, + txn_version, + )? { + Some(TokenWriteSet::TokenOfferId(inner)) => Some(inner), + _ => None, + }; + if let Some(offer) = &maybe_offer { + let table_handle = standardize_address(&table_item.handle.to_string()); + let token_data_id = offer.token_id.token_data_id.to_id(); + + // Try to find owner from write resources + let mut maybe_owner_address = table_handle_to_owner + .get(&table_handle) + .map(|table_metadata| table_metadata.get_owner_address()); + + // If table handle isn't in TableHandleToOwner, try to find owner from token v1 claim events + if maybe_owner_address.is_none() { + if let Some(token_claimed) = tokens_claimed.get(&token_data_id) { + maybe_owner_address = token_claimed.from_address.clone(); + } + } + + let owner_address = maybe_owner_address.unwrap_or_else(|| { + panic!( + "Missing table handle metadata for claim. \ + Version: {}, table handle for PendingClaims: {}, all metadata: {:?} \ + Missing token data id in token claim event. \ + token_data_id: {}, all token claim events: {:?}", + txn_version, table_handle, table_handle_to_owner, token_data_id, tokens_claimed + ) + }); + + let token_id = offer.token_id.clone(); + let token_data_id_struct = token_id.token_data_id; + let collection_data_id_hash = token_data_id_struct.get_collection_data_id_hash(); + let token_data_id_hash = token_data_id_struct.to_hash(); + // Basically adding 0x prefix to the previous 2 lines. This is to be consistent with Token V2 + let collection_id = token_data_id_struct.get_collection_id(); + let token_data_id = token_data_id_struct.to_id(); + let collection_name = token_data_id_struct.get_collection_trunc(); + let name = token_data_id_struct.get_name_trunc(); + + return Ok(Some(Self { + token_data_id_hash, + property_version: token_id.property_version, + from_address: owner_address, + to_address: offer.get_to_address(), + collection_data_id_hash, + creator_address: token_data_id_struct.get_creator_address(), + collection_name, + name, + amount: BigDecimal::zero(), + table_handle, + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + token_data_id, + collection_id, + })); + } + Ok(None) + } +} + +pub trait CurrentTokenPendingClaimConvertible { + fn from_raw(raw_item: RawCurrentTokenPendingClaim) -> Self; +} diff --git a/rust/processor/src/db/parquet/models/mod.rs b/rust/processor/src/db/parquet/models/mod.rs index b4c5ec67a..ca073da73 100644 --- a/rust/processor/src/db/parquet/models/mod.rs +++ b/rust/processor/src/db/parquet/models/mod.rs @@ -3,5 +3,6 @@ pub mod ans_models; pub mod default_models; pub mod event_models; pub mod fungible_asset_models; +pub mod token_v2_models; pub mod transaction_metadata_model; pub mod user_transaction_models; diff --git a/rust/processor/src/db/parquet/models/token_v2_models/mod.rs b/rust/processor/src/db/parquet/models/token_v2_models/mod.rs new file mode 100644 index 000000000..4dd8dd1e0 --- /dev/null +++ b/rust/processor/src/db/parquet/models/token_v2_models/mod.rs @@ -0,0 +1 @@ +pub mod token_claims; diff --git a/rust/processor/src/db/parquet/models/token_v2_models/token_claims.rs b/rust/processor/src/db/parquet/models/token_v2_models/token_claims.rs new file mode 100644 index 000000000..e4ed3d32f --- /dev/null +++ b/rust/processor/src/db/parquet/models/token_v2_models/token_claims.rs @@ -0,0 +1,80 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::token_v2_models::raw_token_claims::{ + CurrentTokenPendingClaimConvertible, RawCurrentTokenPendingClaim, + }, +}; +use allocative_derive::Allocative; +use bigdecimal::ToPrimitive; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct CurrentTokenPendingClaim { + pub token_data_id_hash: String, + pub property_version: u64, + pub from_address: String, + pub to_address: String, + pub collection_data_id_hash: String, + pub creator_address: String, + pub collection_name: String, + pub name: String, + pub amount: String, // String format of BigDecimal + pub table_handle: String, + pub last_transaction_version: i64, + #[allocative(skip)] + pub last_transaction_timestamp: chrono::NaiveDateTime, + pub token_data_id: String, + pub collection_id: String, +} + +impl NamedTable for CurrentTokenPendingClaim { + const TABLE_NAME: &'static str = "current_token_pending_claims"; +} + +impl HasVersion for CurrentTokenPendingClaim { + fn version(&self) -> i64 { + self.last_transaction_version + } +} + +impl GetTimeStamp for CurrentTokenPendingClaim { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.last_transaction_timestamp + } +} + +impl CurrentTokenPendingClaimConvertible for CurrentTokenPendingClaim { + // TODO: consider returning a Result + fn from_raw(raw_item: RawCurrentTokenPendingClaim) -> Self { + Self { + token_data_id_hash: raw_item.token_data_id_hash, + property_version: raw_item + .property_version + .to_u64() + .expect("Failed to convert property_version to u64"), + from_address: raw_item.from_address, + to_address: raw_item.to_address, + collection_data_id_hash: raw_item.collection_data_id_hash, + creator_address: raw_item.creator_address, + collection_name: raw_item.collection_name, + name: raw_item.name, + amount: raw_item.amount.to_string(), // (assuming amount is non-critical) + table_handle: raw_item.table_handle, + last_transaction_version: raw_item.last_transaction_version, + last_transaction_timestamp: raw_item.last_transaction_timestamp, + token_data_id: raw_item.token_data_id, + collection_id: raw_item.collection_id, + } + } +} diff --git a/rust/processor/src/db/postgres/models/token_models/token_claims.rs b/rust/processor/src/db/postgres/models/token_models/token_claims.rs index 90b709cd5..5edc60338 100644 --- a/rust/processor/src/db/postgres/models/token_models/token_claims.rs +++ b/rust/processor/src/db/postgres/models/token_models/token_claims.rs @@ -5,21 +5,16 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}; use crate::{ - db::postgres::models::token_v2_models::v2_token_activities::TokenActivityHelperV1, - schema::current_token_pending_claims, utils::util::standardize_address, + db::common::models::token_v2_models::raw_token_claims::{ + CurrentTokenPendingClaimConvertible, RawCurrentTokenPendingClaim, + }, + schema::current_token_pending_claims, }; -use ahash::AHashMap; -use aptos_protos::transaction::v1::{DeleteTableItem, WriteTableItem}; -use bigdecimal::{BigDecimal, Zero}; +use bigdecimal::BigDecimal; use field_count::FieldCount; use serde::{Deserialize, Serialize}; -// Map to keep track of the metadata of token offers that were claimed. The key is the token data id of the offer. -// Potentially it'd also be useful to keep track of offers that were canceled. -pub type TokenV1Claimed = AHashMap; - #[derive( Clone, Debug, Deserialize, Eq, FieldCount, Identifiable, Insertable, PartialEq, Serialize, )] @@ -58,157 +53,23 @@ impl PartialOrd for CurrentTokenPendingClaim { } } -impl CurrentTokenPendingClaim { - /// Token claim is stored in a table in the offerer's account. The key is token_offer_id (token_id + to address) - /// and value is token (token_id + amount) - pub fn from_write_table_item( - table_item: &WriteTableItem, - txn_version: i64, - txn_timestamp: chrono::NaiveDateTime, - table_handle_to_owner: &TableHandleToOwner, - ) -> anyhow::Result> { - let table_item_data = table_item.data.as_ref().unwrap(); - - let maybe_offer = match TokenWriteSet::from_table_item_type( - table_item_data.key_type.as_str(), - &table_item_data.key, - txn_version, - )? { - Some(TokenWriteSet::TokenOfferId(inner)) => Some(inner), - _ => None, - }; - if let Some(offer) = &maybe_offer { - let maybe_token = match TokenWriteSet::from_table_item_type( - table_item_data.value_type.as_str(), - &table_item_data.value, - txn_version, - )? { - Some(TokenWriteSet::Token(inner)) => Some(inner), - _ => None, - }; - if let Some(token) = &maybe_token { - let table_handle = standardize_address(&table_item.handle.to_string()); - - let maybe_table_metadata = table_handle_to_owner.get(&table_handle); - - if let Some(table_metadata) = maybe_table_metadata { - let token_id = offer.token_id.clone(); - let token_data_id_struct = token_id.token_data_id; - let collection_data_id_hash = - token_data_id_struct.get_collection_data_id_hash(); - let token_data_id_hash = token_data_id_struct.to_hash(); - // Basically adding 0x prefix to the previous 2 lines. This is to be consistent with Token V2 - let collection_id = token_data_id_struct.get_collection_id(); - let token_data_id = token_data_id_struct.to_id(); - let collection_name = token_data_id_struct.get_collection_trunc(); - let name = token_data_id_struct.get_name_trunc(); - - return Ok(Some(Self { - token_data_id_hash, - property_version: token_id.property_version, - from_address: table_metadata.get_owner_address(), - to_address: offer.get_to_address(), - collection_data_id_hash, - creator_address: token_data_id_struct.get_creator_address(), - collection_name, - name, - amount: token.amount.clone(), - table_handle, - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - token_data_id, - collection_id, - })); - } else { - tracing::warn!( - transaction_version = txn_version, - table_handle = table_handle, - "Missing table handle metadata for TokenClaim. {:?}", - table_handle_to_owner - ); - } - } else { - tracing::warn!( - transaction_version = txn_version, - value_type = table_item_data.value_type, - value = table_item_data.value, - "Expecting token as value for key = token_offer_id", - ); - } - } - Ok(None) - } - - pub fn from_delete_table_item( - table_item: &DeleteTableItem, - txn_version: i64, - txn_timestamp: chrono::NaiveDateTime, - table_handle_to_owner: &TableHandleToOwner, - tokens_claimed: &TokenV1Claimed, - ) -> anyhow::Result> { - let table_item_data = table_item.data.as_ref().unwrap(); - - let maybe_offer = match TokenWriteSet::from_table_item_type( - table_item_data.key_type.as_str(), - &table_item_data.key, - txn_version, - )? { - Some(TokenWriteSet::TokenOfferId(inner)) => Some(inner), - _ => None, - }; - if let Some(offer) = &maybe_offer { - let table_handle = standardize_address(&table_item.handle.to_string()); - let token_data_id = offer.token_id.token_data_id.to_id(); - - // Try to find owner from write resources - let mut maybe_owner_address = table_handle_to_owner - .get(&table_handle) - .map(|table_metadata| table_metadata.get_owner_address()); - - // If table handle isn't in TableHandleToOwner, try to find owner from token v1 claim events - if maybe_owner_address.is_none() { - if let Some(token_claimed) = tokens_claimed.get(&token_data_id) { - maybe_owner_address = token_claimed.from_address.clone(); - } - } - - let owner_address = maybe_owner_address.unwrap_or_else(|| { - panic!( - "Missing table handle metadata for claim. \ - Version: {}, table handle for PendingClaims: {}, all metadata: {:?} \ - Missing token data id in token claim event. \ - token_data_id: {}, all token claim events: {:?}", - txn_version, table_handle, table_handle_to_owner, token_data_id, tokens_claimed - ) - }); - - let token_id = offer.token_id.clone(); - let token_data_id_struct = token_id.token_data_id; - let collection_data_id_hash = token_data_id_struct.get_collection_data_id_hash(); - let token_data_id_hash = token_data_id_struct.to_hash(); - // Basically adding 0x prefix to the previous 2 lines. This is to be consistent with Token V2 - let collection_id = token_data_id_struct.get_collection_id(); - let token_data_id = token_data_id_struct.to_id(); - let collection_name = token_data_id_struct.get_collection_trunc(); - let name = token_data_id_struct.get_name_trunc(); - - return Ok(Some(Self { - token_data_id_hash, - property_version: token_id.property_version, - from_address: owner_address, - to_address: offer.get_to_address(), - collection_data_id_hash, - creator_address: token_data_id_struct.get_creator_address(), - collection_name, - name, - amount: BigDecimal::zero(), - table_handle, - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - token_data_id, - collection_id, - })); +impl CurrentTokenPendingClaimConvertible for CurrentTokenPendingClaim { + fn from_raw(raw_item: RawCurrentTokenPendingClaim) -> Self { + Self { + token_data_id_hash: raw_item.token_data_id_hash, + property_version: raw_item.property_version, + from_address: raw_item.from_address, + to_address: raw_item.to_address, + collection_data_id_hash: raw_item.collection_data_id_hash, + creator_address: raw_item.creator_address, + collection_name: raw_item.collection_name, + name: raw_item.name, + amount: raw_item.amount, + table_handle: raw_item.table_handle, + last_transaction_version: raw_item.last_transaction_version, + last_transaction_timestamp: raw_item.last_transaction_timestamp, + token_data_id: raw_item.token_data_id, + collection_id: raw_item.collection_id, } - Ok(None) } } diff --git a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_activities.rs b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_activities.rs index 4be9247c8..75118367a 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_activities.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_activities.rs @@ -7,11 +7,11 @@ use super::v2_token_utils::{TokenStandard, V2TokenEvent}; use crate::{ - db::postgres::models::{ - object_models::v2_object_utils::ObjectAggregatedDataMapping, - token_models::{ - token_claims::TokenV1Claimed, - token_utils::{TokenDataIdType, TokenEvent}, + db::{ + common::models::token_v2_models::raw_token_claims::TokenV1Claimed, + postgres::models::{ + object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_models::token_utils::{TokenDataIdType, TokenEvent}, }, }, schema::token_activities_v2, diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index a13716222..c658c4795 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -3,28 +3,33 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ - db::postgres::models::{ - fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, - object_models::v2_object_utils::{ - ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, + db::{ + common::models::token_v2_models::raw_token_claims::{ + CurrentTokenPendingClaimConvertible, RawCurrentTokenPendingClaim, TokenV1Claimed, }, - resources::{FromWriteResource, V2TokenResource}, - token_models::{ - token_claims::{CurrentTokenPendingClaim, TokenV1Claimed}, - tokens::{CurrentTokenPendingClaimPK, TableHandleToOwner, TableMetadataForToken}, - }, - token_v2_models::{ - v1_token_royalty::CurrentTokenRoyaltyV1, - v2_collections::{CollectionV2, CurrentCollectionV2, CurrentCollectionV2PK}, - v2_token_activities::TokenActivityV2, - v2_token_datas::{CurrentTokenDataV2, CurrentTokenDataV2PK, TokenDataV2}, - v2_token_metadata::{CurrentTokenV2Metadata, CurrentTokenV2MetadataPK}, - v2_token_ownerships::{ - CurrentTokenOwnershipV2, CurrentTokenOwnershipV2PK, NFTOwnershipV2, - TokenOwnershipV2, + postgres::models::{ + fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, }, - v2_token_utils::{ - Burn, BurnEvent, Mint, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, + resources::{FromWriteResource, V2TokenResource}, + token_models::{ + token_claims::CurrentTokenPendingClaim, + tokens::{CurrentTokenPendingClaimPK, TableHandleToOwner, TableMetadataForToken}, + }, + token_v2_models::{ + v1_token_royalty::CurrentTokenRoyaltyV1, + v2_collections::{CollectionV2, CurrentCollectionV2, CurrentCollectionV2PK}, + v2_token_activities::TokenActivityV2, + v2_token_datas::{CurrentTokenDataV2, CurrentTokenDataV2PK, TokenDataV2}, + v2_token_metadata::{CurrentTokenV2Metadata, CurrentTokenV2MetadataPK}, + v2_token_ownerships::{ + CurrentTokenOwnershipV2, CurrentTokenOwnershipV2PK, NFTOwnershipV2, + TokenOwnershipV2, + }, + v2_token_utils::{ + Burn, BurnEvent, Mint, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, + }, }, }, }, @@ -613,7 +618,7 @@ impl ProcessorTrait for TokenV2Processor { token_activities_v2, mut current_token_v2_metadata, current_token_royalties_v1, - current_token_claims, + raw_current_token_claims, ) = parse_v2_token( &transactions, &table_handle_to_owner, @@ -623,6 +628,11 @@ impl ProcessorTrait for TokenV2Processor { ) .await; + let postgres_current_token_claims: Vec = raw_current_token_claims + .into_iter() + .map(CurrentTokenPendingClaim::from_raw) + .collect(); + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -662,7 +672,7 @@ impl ProcessorTrait for TokenV2Processor { &token_activities_v2, ¤t_token_v2_metadata, ¤t_token_royalties_v1, - ¤t_token_claims, + &postgres_current_token_claims, &self.per_table_chunk_sizes, ) .await; @@ -714,7 +724,7 @@ pub async fn parse_v2_token( Vec, Vec, Vec, - Vec, + Vec, ) { // Token V2 and V1 combined let mut collections_v2 = vec![]; @@ -747,7 +757,7 @@ pub async fn parse_v2_token( // migrating this from v1 token model as we don't have any replacement table for this let mut all_current_token_claims: AHashMap< CurrentTokenPendingClaimPK, - CurrentTokenPendingClaim, + RawCurrentTokenPendingClaim, > = AHashMap::new(); // Code above is inefficient (multiple passthroughs) so I'm approaching TokenV2 with a cleaner code structure @@ -1001,7 +1011,7 @@ pub async fn parse_v2_token( } } if let Some(current_token_token_claim) = - CurrentTokenPendingClaim::from_write_table_item( + RawCurrentTokenPendingClaim::from_write_table_item( table_item, txn_version, txn_timestamp, @@ -1053,7 +1063,7 @@ pub async fn parse_v2_token( } } if let Some(current_token_token_claim) = - CurrentTokenPendingClaim::from_delete_table_item( + RawCurrentTokenPendingClaim::from_delete_table_item( table_item, txn_version, txn_timestamp, @@ -1287,7 +1297,7 @@ pub async fn parse_v2_token( .collect::>(); let mut all_current_token_claims = all_current_token_claims .into_values() - .collect::>(); + .collect::>(); // Sort by PK current_collections_v2.sort_by(|a, b| a.collection_id.cmp(&b.collection_id)); current_deleted_token_datas_v2.sort_by(|a, b| a.token_data_id.cmp(&b.token_data_id)); diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 13d2b33c8..6b4e5943f 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -8,6 +8,7 @@ use crate::{ parquet_default_processor::ParquetDefaultProcessor, parquet_events_processor::ParquetEventsProcessor, parquet_fungible_asset_processor::ParquetFungibleAssetProcessor, + parquet_token_v2_processor::ParquetTokenV2Processor, parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor, parquet_user_transaction_processor::ParquetUserTransactionsProcessor, }, @@ -112,6 +113,10 @@ impl RunnableConfig for IndexerProcessorConfig { ParquetAccountTransactionsProcessor::new(self.clone()).await?; parquet_account_transactions_processor.run_processor().await }, + ProcessorConfig::ParquetTokenV2Processor(_) => { + let parquet_token_v2_processor = ParquetTokenV2Processor::new(self.clone()).await?; + parquet_token_v2_processor.run_processor().await + }, } } diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 52ef9cd6e..b9e04be39 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -27,6 +27,7 @@ use processor::{ }, parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, + token_v2_models::token_claims::CurrentTokenPendingClaim, transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, user_transaction_models::parquet_user_transactions::UserTransaction, }, @@ -85,6 +86,7 @@ pub enum ProcessorConfig { ParquetFungibleAssetProcessor(ParquetDefaultProcessorConfig), ParquetTransactionMetadataProcessor(ParquetDefaultProcessorConfig), ParquetAccountTransactionsProcessor(ParquetDefaultProcessorConfig), + ParquetTokenV2Processor(ParquetDefaultProcessorConfig), } impl ProcessorConfig { @@ -105,6 +107,7 @@ impl ProcessorConfig { | ProcessorConfig::ParquetUserTransactionsProcessor(config) | ProcessorConfig::ParquetTransactionMetadataProcessor(config) | ProcessorConfig::ParquetAccountTransactionsProcessor(config) + | ProcessorConfig::ParquetTokenV2Processor(config) | ProcessorConfig::ParquetFungibleAssetProcessor(config) => { // Get the processor name as a prefix let processor_name = self.name(); @@ -163,6 +166,9 @@ impl ProcessorConfig { ProcessorName::ParquetAccountTransactionsProcessor => { HashSet::from([AccountTransaction::TABLE_NAME.to_string()]) }, + ProcessorName::ParquetTokenV2Processor => { + HashSet::from([CurrentTokenPendingClaim::TABLE_NAME.to_string()]) + }, _ => HashSet::new(), // Default case for unsupported processors } } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 1d5027cca..f9ec72902 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -12,31 +12,30 @@ use enum_dispatch::enum_dispatch; use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; use parquet::schema::types::Type; use processor::{ - db::{ - parquet::models::{ - account_transaction_models::parquet_account_transactions::AccountTransaction, - default_models::{ - parquet_block_metadata_transactions::BlockMetadataTransaction, - parquet_move_modules::MoveModule, - parquet_move_resources::MoveResource, - parquet_move_tables::{CurrentTableItem, TableItem}, - parquet_table_metadata::TableMetadata, - parquet_transactions::Transaction as ParquetTransaction, - parquet_write_set_changes::WriteSetChangeModel, - }, - event_models::parquet_events::Event, - fungible_asset_models::{ - parquet_v2_fungible_asset_activities::FungibleAssetActivity, - parquet_v2_fungible_asset_balances::{ - CurrentFungibleAssetBalance, CurrentUnifiedFungibleAssetBalance, - FungibleAssetBalance, - }, - parquet_v2_fungible_metadata::FungibleAssetMetadataModel, + db::parquet::models::{ + account_transaction_models::parquet_account_transactions::AccountTransaction, + ans_models::parquet_ans_primary_name_v2::AnsPrimaryNameV2, + default_models::{ + parquet_block_metadata_transactions::BlockMetadataTransaction, + parquet_move_modules::MoveModule, + parquet_move_resources::MoveResource, + parquet_move_tables::{CurrentTableItem, TableItem}, + parquet_table_metadata::TableMetadata, + parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + event_models::parquet_events::Event, + fungible_asset_models::{ + parquet_v2_fungible_asset_activities::FungibleAssetActivity, + parquet_v2_fungible_asset_balances::{ + CurrentFungibleAssetBalance, CurrentUnifiedFungibleAssetBalance, + FungibleAssetBalance, }, - transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, - user_transaction_models::parquet_user_transactions::UserTransaction, + parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, - postgres::models::ans_models::parquet_ans_lookup_v2::AnsPrimaryNameV2, + token_v2_models::token_claims::CurrentTokenPendingClaim, + transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, + user_transaction_models::parquet_user_transactions::UserTransaction, }, utils::table_flags::TableFlags, }; @@ -54,6 +53,7 @@ pub mod parquet_ans_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; +pub mod parquet_token_v2_processor; pub mod parquet_transaction_metadata_processor; pub mod parquet_user_transaction_processor; @@ -107,6 +107,8 @@ pub enum ParquetTypeEnum { WriteSetSize, // account transactions AccountTransactions, + // token v2 + CurrentTokenPendingClaims, } /// Trait for handling various Parquet types. @@ -185,6 +187,10 @@ impl_parquet_trait!( ); impl_parquet_trait!(WriteSetSize, ParquetTypeEnum::WriteSetSize); impl_parquet_trait!(AccountTransaction, ParquetTypeEnum::AccountTransactions); +impl_parquet_trait!( + CurrentTokenPendingClaim, + ParquetTypeEnum::CurrentTokenPendingClaims +); #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] @@ -207,6 +213,7 @@ pub enum ParquetTypeStructs { CurrentUnifiedFungibleAssetBalance(Vec), WriteSetSize(Vec), AccountTransaction(Vec), + CurrentTokenPendingClaim(Vec), } impl ParquetTypeStructs { @@ -244,6 +251,9 @@ impl ParquetTypeStructs { ParquetTypeEnum::AccountTransactions => { ParquetTypeStructs::AccountTransaction(Vec::new()) }, + ParquetTypeEnum::CurrentTokenPendingClaims => { + ParquetTypeStructs::CurrentTokenPendingClaim(Vec::new()) + }, } } @@ -362,6 +372,12 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + ( + ParquetTypeStructs::CurrentTokenPendingClaim(self_data), + ParquetTypeStructs::CurrentTokenPendingClaim(other_data), + ) => { + handle_append!(self_data, other_data) + }, _ => Err(ProcessorError::ProcessError { message: "Mismatched buffer types in append operation".to_string(), }), diff --git a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs new file mode 100644 index 000000000..3d9215565 --- /dev/null +++ b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs @@ -0,0 +1,182 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + set_backfill_table_flag, ParquetTypeEnum, + }, + steps::{ + common::{ + parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_processor_status_saver, + }, + parquet_token_v2_processor::parquet_token_v2_extractor::ParquetTokenV2Extractor, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, + }, +}; +use anyhow::Context; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::parquet::models::token_v2_models::token_claims::CurrentTokenPendingClaim, +}; +use std::{collections::HashMap, sync::Arc}; +use tracing::{debug, info}; + +pub struct ParquetTokenV2Processor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl ParquetTokenV2Processor { + pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) + } +} + +#[async_trait::async_trait] +impl ProcessorTrait for ParquetTokenV2Processor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> anyhow::Result<()> { + // Run Migrations + let parquet_db_config = match self.config.db_config { + DbConfig::ParquetConfig(ref parquet_config) => { + run_migrations( + parquet_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + parquet_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid db config for ParquetTokenV2Processor {:?}", + self.config.db_config + )); + }, + }; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let parquet_processor_config = match self.config.processor_config.clone() { + ProcessorConfig::ParquetTokenV2Processor(parquet_processor_config) => { + parquet_processor_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor configuration for ParquetTokenV2Processor {:?}", + self.config.processor_config + )); + }, + }; + + let processor_status_table_names = self + .config + .processor_config + .get_processor_status_table_names() + .context("Failed to get table names for the processor status table")?; + + let starting_version = get_min_last_success_version_parquet( + &self.config, + self.db_pool.clone(), + processor_status_table_names, + ) + .await?; + println!("Starting version: {:?}", starting_version); + + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table); + // TODO: Update this + let parquet_token_v2_extractor = + ParquetTokenV2Extractor::new(backfill_table, 5, 500, self.db_pool.clone()); + + let gcs_client = + initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await; + + // TODO: Update this + let parquet_type_to_schemas: HashMap> = [( + ParquetTypeEnum::CurrentTokenPendingClaims, + CurrentTokenPendingClaim::schema(), + )] + .into_iter() + .collect(); + + let default_size_buffer_step = initialize_parquet_buffer_step( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_processor_config.upload_interval, + parquet_processor_config.max_buffer_size, + parquet_db_config.bucket_name.clone(), + parquet_db_config.bucket_root.clone(), + self.name().to_string(), + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + + let channel_size = parquet_processor_config.channel_size; + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to( + parquet_token_v2_extractor.into_runnable_step(), + channel_size, + ) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 04b2b90dd..5f0f944ee 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -14,6 +14,7 @@ pub mod parquet_account_transactions_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; +pub mod parquet_token_v2_processor; pub mod parquet_transaction_metadata_processor; pub mod parquet_user_transaction_processor; diff --git a/rust/sdk-processor/src/steps/parquet_token_v2_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_token_v2_processor/mod.rs new file mode 100644 index 000000000..94f6e5e7d --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_token_v2_processor/mod.rs @@ -0,0 +1 @@ +pub mod parquet_token_v2_extractor; diff --git a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs new file mode 100644 index 000000000..588dc4b3c --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs @@ -0,0 +1,136 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + utils::{database::ArcDbPool, parquet_extractor_helper::add_to_map_if_opted_in_for_backfill}, +}; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::{ + common::models::token_v2_models::raw_token_claims::CurrentTokenPendingClaimConvertible, + parquet::models::token_v2_models::token_claims::CurrentTokenPendingClaim, + postgres::models::token_models::tokens::TableMetadataForToken, + }, + processors::token_v2_processor::parse_v2_token, + utils::table_flags::TableFlags, +}; +use std::collections::HashMap; +use tracing::debug; + +/// Extracts parquet data from transactions, allowing optional selection of specific tables. +pub struct ParquetTokenV2Extractor +where + Self: Processable + Send + Sized + 'static, +{ + pub opt_in_tables: TableFlags, + // TODO: Revisit and remove + query_retries: u32, + query_retry_delay_ms: u64, + conn_pool: ArcDbPool, +} + +impl ParquetTokenV2Extractor { + pub fn new( + opt_in_tables: TableFlags, + query_retries: u32, + query_retry_delay_ms: u64, + conn_pool: ArcDbPool, + ) -> Self { + Self { + opt_in_tables, + query_retries, + query_retry_delay_ms, + conn_pool, + } + } +} +type ParquetTypeMap = HashMap; + +#[async_trait] +impl Processable for ParquetTokenV2Extractor { + type Input = Vec; + type Output = ParquetTypeMap; + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> anyhow::Result>, ProcessorError> { + let mut conn = self + .conn_pool + .get() + .await + .map_err(|e| ProcessorError::DBStoreError { + message: format!("Failed to get connection from pool: {:?}", e), + query: None, + })?; + + // First get all token related table metadata from the batch of transactions. This is in case + // an earlier transaction has metadata (in resources) that's missing from a later transaction. + let table_handle_to_owner: ahash::AHashMap = + TableMetadataForToken::get_table_handle_to_owner_from_transactions(&transactions.data); + + let ( + _collections_v2, + _token_datas_v2, + _token_ownerships_v2, + _current_collections_v2, + _current_token_datas_v2, + _current_deleted_token_datas_v2, + _current_token_ownerships_v2, + _current_deleted_token_ownerships_v2, + _token_activities_v2, + _current_token_v2_metadata, + _current_token_royalties_v1, + raw_current_token_claims, + ) = parse_v2_token( + &transactions.data, + &table_handle_to_owner, + &mut conn, + self.query_retries, + self.query_retry_delay_ms, + ) + .await; + + let parquet_current_token_claims: Vec = raw_current_token_claims + .into_iter() + .map(CurrentTokenPendingClaim::from_raw) + .collect(); + + // Print the size of each extracted data type + debug!("Processed data sizes:"); + debug!( + " - CurrentTokenPendingClaim: {}", + parquet_current_token_claims.len() + ); + + let mut map: HashMap = HashMap::new(); + + // Array of tuples for each data type and its corresponding enum variant and flag + let data_types = [( + TableFlags::CURRENT_TOKEN_PENDING_CLAIMS, + ParquetTypeEnum::CurrentTokenPendingClaims, + ParquetTypeStructs::CurrentTokenPendingClaim(parquet_current_token_claims), + )]; + + // Populate the map based on opt-in tables + add_to_map_if_opted_in_for_backfill(self.opt_in_tables, &mut map, data_types.to_vec()); + + Ok(Some(TransactionContext { + data: map, + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for ParquetTokenV2Extractor {} + +impl NamedStep for ParquetTokenV2Extractor { + fn name(&self) -> String { + "ParquetTokenV2Extractor".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs index c08c9444f..4e29e0f5d 100644 --- a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs @@ -7,15 +7,18 @@ use aptos_indexer_processor_sdk::{ }; use async_trait::async_trait; use processor::{ - db::postgres::models::{ - token_models::{token_claims::CurrentTokenPendingClaim, tokens::TableMetadataForToken}, - token_v2_models::{ - v1_token_royalty::CurrentTokenRoyaltyV1, - v2_collections::{CollectionV2, CurrentCollectionV2}, - v2_token_activities::TokenActivityV2, - v2_token_datas::{CurrentTokenDataV2, TokenDataV2}, - v2_token_metadata::CurrentTokenV2Metadata, - v2_token_ownerships::{CurrentTokenOwnershipV2, TokenOwnershipV2}, + db::{ + common::models::token_v2_models::raw_token_claims::CurrentTokenPendingClaimConvertible, + postgres::models::{ + token_models::{token_claims::CurrentTokenPendingClaim, tokens::TableMetadataForToken}, + token_v2_models::{ + v1_token_royalty::CurrentTokenRoyaltyV1, + v2_collections::{CollectionV2, CurrentCollectionV2}, + v2_token_activities::TokenActivityV2, + v2_token_datas::{CurrentTokenDataV2, TokenDataV2}, + v2_token_metadata::CurrentTokenV2Metadata, + v2_token_ownerships::{CurrentTokenOwnershipV2, TokenOwnershipV2}, + }, }, }, processors::token_v2_processor::parse_v2_token, @@ -108,7 +111,7 @@ impl Processable for TokenV2Extractor { token_activities_v2, current_token_v2_metadata, current_token_royalties_v1, - current_token_claims, + raw_current_token_claims, ) = parse_v2_token( &transactions.data, &table_handle_to_owner, @@ -118,6 +121,11 @@ impl Processable for TokenV2Extractor { ) .await; + let postgres_current_token_claims: Vec = raw_current_token_claims + .into_iter() + .map(CurrentTokenPendingClaim::from_raw) + .collect(); + Ok(Some(TransactionContext { data: ( collections_v2, @@ -131,7 +139,7 @@ impl Processable for TokenV2Extractor { token_activities_v2, current_token_v2_metadata, current_token_royalties_v1, - current_token_claims, + postgres_current_token_claims, ), metadata: transactions.metadata, }))