diff --git a/rust/processor/src/db/common/models/token_v2_models/mod.rs b/rust/processor/src/db/common/models/token_v2_models/mod.rs index e64a6221..64224958 100644 --- a/rust/processor/src/db/common/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/common/models/token_v2_models/mod.rs @@ -1,2 +1,3 @@ pub mod raw_token_claims; pub mod raw_v1_token_royalty; +pub mod raw_v2_token_metadata; diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_metadata.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_metadata.rs new file mode 100644 index 00000000..76cb89e2 --- /dev/null +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_metadata.rs @@ -0,0 +1,96 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + db::postgres::models::{ + default_models::move_resources::MoveResource, + object_models::v2_object_utils::ObjectAggregatedDataMapping, + resources::{COIN_ADDR, TOKEN_ADDR, TOKEN_V2_ADDR}, + token_models::token_utils::NAME_LENGTH, + }, + utils::util::{standardize_address, truncate_str}, +}; +use anyhow::Context; +use aptos_protos::transaction::v1::WriteResource; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +// PK of current_objects, i.e. object_address, resource_type +pub type CurrentTokenV2MetadataPK = (String, String); + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct RawCurrentTokenV2Metadata { + pub object_address: String, + pub resource_type: String, + pub data: Value, + pub state_key_hash: String, + pub last_transaction_version: i64, + pub last_transaction_timestamp: chrono::NaiveDateTime, +} + +impl Ord for RawCurrentTokenV2Metadata { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.object_address + .cmp(&other.object_address) + .then(self.resource_type.cmp(&other.resource_type)) + } +} +impl PartialOrd for RawCurrentTokenV2Metadata { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl RawCurrentTokenV2Metadata { + /// Parsing unknown resources with 0x4::token::Token + pub fn from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + object_metadatas: &ObjectAggregatedDataMapping, + txn_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + let object_address = standardize_address(&write_resource.address.to_string()); + if let Some(object_data) = object_metadatas.get(&object_address) { + // checking if token_v2 + if object_data.token.is_some() { + let move_tag = + MoveResource::convert_move_struct_tag(write_resource.r#type.as_ref().unwrap()); + let resource_type_addr = move_tag.get_address(); + if matches!( + resource_type_addr.as_str(), + COIN_ADDR | TOKEN_ADDR | TOKEN_V2_ADDR + ) { + return Ok(None); + } + + let resource = MoveResource::from_write_resource(write_resource, 0, txn_version, 0); + + let state_key_hash = object_data.object.get_state_key_hash(); + if state_key_hash != resource.state_key_hash { + return Ok(None); + } + + let resource_type = truncate_str(&resource.type_, NAME_LENGTH); + return Ok(Some(RawCurrentTokenV2Metadata { + object_address, + resource_type, + data: resource + .data + .context("data must be present in write resource")?, + state_key_hash: resource.state_key_hash, + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + })); + } + } + Ok(None) + } +} + +pub trait CurrentTokenV2MetadataConvertible { + fn from_raw(raw_item: RawCurrentTokenV2Metadata) -> Self; +} diff --git a/rust/processor/src/db/parquet/models/token_v2_models/mod.rs b/rust/processor/src/db/parquet/models/token_v2_models/mod.rs index d42cead4..947fabf7 100644 --- a/rust/processor/src/db/parquet/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/parquet/models/token_v2_models/mod.rs @@ -1,2 +1,3 @@ pub mod token_claims; pub mod v1_token_royalty; +pub mod v2_token_metadata; diff --git a/rust/processor/src/db/parquet/models/token_v2_models/v2_token_metadata.rs b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_metadata.rs new file mode 100644 index 00000000..058b300e --- /dev/null +++ b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_metadata.rs @@ -0,0 +1,59 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::token_v2_models::raw_v2_token_metadata::{ + CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata, + }, +}; +use allocative_derive::Allocative; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct CurrentTokenV2Metadata { + pub object_address: String, + pub resource_type: String, + pub data: String, + pub state_key_hash: String, + pub last_transaction_version: i64, + #[allocative(skip)] + pub last_transaction_timestamp: chrono::NaiveDateTime, +} +impl NamedTable for CurrentTokenV2Metadata { + const TABLE_NAME: &'static str = "current_token_v2_metadata"; +} + +impl HasVersion for CurrentTokenV2Metadata { + fn version(&self) -> i64 { + self.last_transaction_version + } +} + +impl GetTimeStamp for CurrentTokenV2Metadata { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.last_transaction_timestamp + } +} + +impl CurrentTokenV2MetadataConvertible for CurrentTokenV2Metadata { + // TODO: consider returning a Result + fn from_raw(raw_item: RawCurrentTokenV2Metadata) -> Self { + Self { + object_address: raw_item.object_address, + resource_type: raw_item.resource_type, + data: canonical_json::to_string(&raw_item.data).unwrap(), // TODO: handle better + state_key_hash: raw_item.state_key_hash, + last_transaction_version: raw_item.last_transaction_version, + last_transaction_timestamp: raw_item.last_transaction_timestamp, + } + } +} diff --git a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_metadata.rs b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_metadata.rs index c383bc07..39600d66 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_metadata.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_metadata.rs @@ -6,17 +6,11 @@ #![allow(clippy::unused_unit)] use crate::{ - db::postgres::models::{ - default_models::move_resources::MoveResource, - object_models::v2_object_utils::ObjectAggregatedDataMapping, - resources::{COIN_ADDR, TOKEN_ADDR, TOKEN_V2_ADDR}, - token_models::token_utils::NAME_LENGTH, + db::common::models::token_v2_models::raw_v2_token_metadata::{ + CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata, }, schema::current_token_v2_metadata, - utils::util::{standardize_address, truncate_str}, }; -use anyhow::Context; -use aptos_protos::transaction::v1::WriteResource; use field_count::FieldCount; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -24,9 +18,7 @@ use serde_json::Value; // PK of current_objects, i.e. object_address, resource_type pub type CurrentTokenV2MetadataPK = (String, String); -#[derive( - Clone, Debug, Deserialize, Eq, FieldCount, Identifiable, Insertable, PartialEq, Serialize, -)] +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(object_address, resource_type))] #[diesel(table_name = current_token_v2_metadata)] pub struct CurrentTokenV2Metadata { @@ -37,59 +29,14 @@ pub struct CurrentTokenV2Metadata { pub last_transaction_version: i64, } -impl Ord for CurrentTokenV2Metadata { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.object_address - .cmp(&other.object_address) - .then(self.resource_type.cmp(&other.resource_type)) - } -} -impl PartialOrd for CurrentTokenV2Metadata { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl CurrentTokenV2Metadata { - /// Parsing unknown resources with 0x4::token::Token - pub fn from_write_resource( - write_resource: &WriteResource, - txn_version: i64, - object_metadatas: &ObjectAggregatedDataMapping, - ) -> anyhow::Result> { - let object_address = standardize_address(&write_resource.address.to_string()); - if let Some(object_data) = object_metadatas.get(&object_address) { - // checking if token_v2 - if object_data.token.is_some() { - let move_tag = - MoveResource::convert_move_struct_tag(write_resource.r#type.as_ref().unwrap()); - let resource_type_addr = move_tag.get_address(); - if matches!( - resource_type_addr.as_str(), - COIN_ADDR | TOKEN_ADDR | TOKEN_V2_ADDR - ) { - return Ok(None); - } - - let resource = MoveResource::from_write_resource(write_resource, 0, txn_version, 0); - - let state_key_hash = object_data.object.get_state_key_hash(); - if state_key_hash != resource.state_key_hash { - return Ok(None); - } - - let resource_type = truncate_str(&resource.type_, NAME_LENGTH); - return Ok(Some(CurrentTokenV2Metadata { - object_address, - resource_type, - data: resource - .data - .context("data must be present in write resource")?, - state_key_hash: resource.state_key_hash, - last_transaction_version: txn_version, - })); - } +impl CurrentTokenV2MetadataConvertible for CurrentTokenV2Metadata { + fn from_raw(raw_item: RawCurrentTokenV2Metadata) -> Self { + Self { + object_address: raw_item.object_address, + resource_type: raw_item.resource_type, + data: raw_item.data, + state_key_hash: raw_item.state_key_hash, + last_transaction_version: raw_item.last_transaction_version, } - Ok(None) } } diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index 1635190d..25649157 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -9,6 +9,7 @@ use crate::{ CurrentTokenPendingClaimConvertible, RawCurrentTokenPendingClaim, TokenV1Claimed, }, raw_v1_token_royalty::{CurrentTokenRoyaltyV1Convertible, RawCurrentTokenRoyaltyV1}, + raw_v2_token_metadata::{CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata}, }, postgres::models::{ fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, @@ -619,7 +620,7 @@ impl ProcessorTrait for TokenV2Processor { current_token_ownerships_v2, current_deleted_token_ownerships_v2, token_activities_v2, - mut current_token_v2_metadata, + raw_current_token_v2_metadata, raw_current_token_royalties_v1, raw_current_token_claims, ) = parse_v2_token( @@ -642,6 +643,12 @@ impl ProcessorTrait for TokenV2Processor { .map(CurrentTokenRoyaltyV1::from_raw) .collect(); + let mut postgres_current_token_v2_metadata: Vec = + raw_current_token_v2_metadata + .into_iter() + .map(CurrentTokenV2Metadata::from_raw) + .collect(); + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -661,7 +668,7 @@ impl ProcessorTrait for TokenV2Processor { .deprecated_tables .contains(TableFlags::CURRENT_TOKEN_V2_METADATA) { - current_token_v2_metadata.clear(); + postgres_current_token_v2_metadata.clear(); } let tx_result = insert_to_db( @@ -679,7 +686,7 @@ impl ProcessorTrait for TokenV2Processor { ¤t_deleted_token_ownerships_v2, ), &token_activities_v2, - ¤t_token_v2_metadata, + &postgres_current_token_v2_metadata, &postgres_current_token_royalties_v1, &postgres_current_token_claims, &self.per_table_chunk_sizes, @@ -731,7 +738,7 @@ pub async fn parse_v2_token( Vec, Vec, // deleted token ownerships Vec, - Vec, + Vec, Vec, Vec, ) { @@ -759,8 +766,10 @@ pub async fn parse_v2_token( // we can still get the object core metadata for it let mut token_v2_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new(); // Basically token properties - let mut current_token_v2_metadata: AHashMap = - AHashMap::new(); + let mut current_token_v2_metadata: AHashMap< + CurrentTokenV2MetadataPK, + RawCurrentTokenV2Metadata, + > = AHashMap::new(); let mut current_token_royalties_v1: AHashMap = AHashMap::new(); // migrating this from v1 token model as we don't have any replacement table for this @@ -1208,12 +1217,14 @@ pub async fn parse_v2_token( } // Track token properties - if let Some(token_metadata) = CurrentTokenV2Metadata::from_write_resource( - resource, - txn_version, - &token_v2_metadata_helper, - ) - .unwrap() + if let Some(token_metadata) = + RawCurrentTokenV2Metadata::from_write_resource( + resource, + txn_version, + &token_v2_metadata_helper, + txn_timestamp, + ) + .unwrap() { current_token_v2_metadata.insert( ( @@ -1297,7 +1308,7 @@ pub async fn parse_v2_token( .collect::>(); let mut current_token_v2_metadata = current_token_v2_metadata .into_values() - .collect::>(); + .collect::>(); let mut current_deleted_token_ownerships_v2 = current_deleted_token_ownerships_v2 .into_values() .collect::>(); diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 9f3aefb7..59d46be2 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -29,6 +29,7 @@ use processor::{ }, token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, + v2_token_metadata::CurrentTokenV2Metadata, }, transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, user_transaction_models::parquet_user_transactions::UserTransaction, @@ -171,6 +172,7 @@ impl ProcessorConfig { ProcessorName::ParquetTokenV2Processor => HashSet::from([ CurrentTokenPendingClaim::TABLE_NAME.to_string(), CurrentTokenRoyaltyV1::TABLE_NAME.to_string(), + CurrentTokenV2Metadata::TABLE_NAME.to_string(), ]), _ => HashSet::new(), // Default case for unsupported processors } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index ff92f6c75..a94965fb 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -35,6 +35,7 @@ use processor::{ }, token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, + v2_token_metadata::CurrentTokenV2Metadata, }, transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, user_transaction_models::parquet_user_transactions::UserTransaction, @@ -112,6 +113,7 @@ pub enum ParquetTypeEnum { // token v2 CurrentTokenPendingClaims, CurrentTokenRoyaltyV1, + CurrentTokenV2Metadata, } /// Trait for handling various Parquet types. @@ -198,6 +200,10 @@ impl_parquet_trait!( CurrentTokenRoyaltyV1, ParquetTypeEnum::CurrentTokenRoyaltyV1 ); +impl_parquet_trait!( + CurrentTokenV2Metadata, + ParquetTypeEnum::CurrentTokenV2Metadata +); #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] @@ -222,6 +228,7 @@ pub enum ParquetTypeStructs { AccountTransaction(Vec), CurrentTokenPendingClaim(Vec), CurrentTokenRoyaltyV1(Vec), + CurrentTokenV2Metadata(Vec), } impl ParquetTypeStructs { @@ -265,6 +272,9 @@ impl ParquetTypeStructs { ParquetTypeEnum::CurrentTokenRoyaltyV1 => { ParquetTypeStructs::CurrentTokenRoyaltyV1(Vec::new()) }, + ParquetTypeEnum::CurrentTokenV2Metadata => { + ParquetTypeStructs::CurrentTokenV2Metadata(Vec::new()) + }, } } @@ -395,6 +405,12 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + ( + ParquetTypeStructs::CurrentTokenV2Metadata(self_data), + ParquetTypeStructs::CurrentTokenV2Metadata(other_data), + ) => { + handle_append!(self_data, other_data) + }, _ => Err(ProcessorError::ProcessError { message: "Mismatched buffer types in append operation".to_string(), }), diff --git a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs index 27be8cf2..d32a12ba 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs @@ -32,6 +32,7 @@ use processor::{ bq_analytics::generic_parquet_processor::HasParquetSchema, db::parquet::models::token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, + v2_token_metadata::CurrentTokenV2Metadata, }, }; use std::{collections::HashMap, sync::Arc}; @@ -132,6 +133,10 @@ impl ProcessorTrait for ParquetTokenV2Processor { ParquetTypeEnum::CurrentTokenRoyaltyV1, CurrentTokenRoyaltyV1::schema(), ), + ( + ParquetTypeEnum::CurrentTokenV2Metadata, + CurrentTokenV2Metadata::schema(), + ), ] .into_iter() .collect(); diff --git a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs index d88e841d..b33c37f4 100644 --- a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs @@ -14,9 +14,11 @@ use processor::{ common::models::token_v2_models::{ raw_token_claims::CurrentTokenPendingClaimConvertible, raw_v1_token_royalty::CurrentTokenRoyaltyV1Convertible, + raw_v2_token_metadata::CurrentTokenV2MetadataConvertible, }, parquet::models::token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, + v2_token_metadata::CurrentTokenV2Metadata, }, postgres::models::token_models::tokens::TableMetadataForToken, }, @@ -89,7 +91,7 @@ impl Processable for ParquetTokenV2Extractor { _current_token_ownerships_v2, _current_deleted_token_ownerships_v2, _token_activities_v2, - _current_token_v2_metadata, + raw_current_token_v2_metadata, raw_current_token_royalties_v1, raw_current_token_claims, ) = parse_v2_token( @@ -112,6 +114,12 @@ impl Processable for ParquetTokenV2Extractor { .map(CurrentTokenRoyaltyV1::from_raw) .collect(); + let parquet_current_token_v2_metadata: Vec = + raw_current_token_v2_metadata + .into_iter() + .map(CurrentTokenV2Metadata::from_raw) + .collect(); + // Print the size of each extracted data type debug!("Processed data sizes:"); debug!( @@ -122,6 +130,10 @@ impl Processable for ParquetTokenV2Extractor { " - CurrentTokenRoyaltyV1: {}", parquet_current_token_royalties_v1.len() ); + debug!( + " - CurrentTokenV2Metadata: {}", + parquet_current_token_v2_metadata.len() + ); let mut map: HashMap = HashMap::new(); @@ -137,6 +149,11 @@ impl Processable for ParquetTokenV2Extractor { ParquetTypeEnum::CurrentTokenRoyaltyV1, ParquetTypeStructs::CurrentTokenRoyaltyV1(parquet_current_token_royalties_v1), ), + ( + TableFlags::CURRENT_TOKEN_V2_METADATA, + ParquetTypeEnum::CurrentTokenV2Metadata, + ParquetTypeStructs::CurrentTokenV2Metadata(parquet_current_token_v2_metadata), + ), ]; // Populate the map based on opt-in tables diff --git a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs index 23c67fbe..39819618 100644 --- a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs @@ -11,6 +11,7 @@ use processor::{ common::models::token_v2_models::{ raw_token_claims::CurrentTokenPendingClaimConvertible, raw_v1_token_royalty::CurrentTokenRoyaltyV1Convertible, + raw_v2_token_metadata::CurrentTokenV2MetadataConvertible, }, postgres::models::{ token_models::{token_claims::CurrentTokenPendingClaim, tokens::TableMetadataForToken}, @@ -112,7 +113,7 @@ impl Processable for TokenV2Extractor { current_token_ownerships_v2, current_deleted_token_ownerships_v2, token_activities_v2, - current_token_v2_metadata, + raw_current_token_v2_metadata, raw_current_token_royalties_v1, raw_current_token_claims, ) = parse_v2_token( @@ -135,6 +136,12 @@ impl Processable for TokenV2Extractor { .map(CurrentTokenRoyaltyV1::from_raw) .collect(); + let postgres_current_token_v2_metadata: Vec = + raw_current_token_v2_metadata + .into_iter() + .map(CurrentTokenV2Metadata::from_raw) + .collect(); + Ok(Some(TransactionContext { data: ( collections_v2, @@ -146,7 +153,7 @@ impl Processable for TokenV2Extractor { current_token_ownerships_v2, current_deleted_token_ownerships_v2, token_activities_v2, - current_token_v2_metadata, + postgres_current_token_v2_metadata, postgres_current_token_royalties_v1, postgres_current_token_claims, ),