diff --git a/rust/processor/migrations/2024-05-21-221101_add_royalty_v1/up.sql b/rust/processor/migrations/2024-05-21-221101_add_royalty_v1/up.sql index b09cf573c..3f4864c97 100644 --- a/rust/processor/migrations/2024-05-21-221101_add_royalty_v1/up.sql +++ b/rust/processor/migrations/2024-05-21-221101_add_royalty_v1/up.sql @@ -1,19 +1,23 @@ -- Your SQL goes here CREATE TABLE token_royalty ( transaction_version BIGINT NOT NULL, - token_data_id VARCHAR(64) NOT NULL, - payee_address VARCHAR(64) NOT NULL, - royalty_points_numerator_v1 INT NOT NULL, - royalty_points_denominator_v1 INT NOT NULL, + token_data_id VARCHAR(66) NOT NULL, + payee_address VARCHAR(66) NOT NULL, + royalty_points_numerator NUMERIC NOT NULL, + royalty_points_denominator NUMERIC NOT NULL, token_standard TEXT NOT NULL, - (transaction_version, token_data_id) PRIMARY KEY + inserted_at TIMESTAMP NOT NULL DEFAULT NOW(), + PRIMARY KEY (transaction_version, token_data_id) ); CREATE TABLE current_token_royalty ( - token_data_id VARCHAR(64) NOT NULL, - payee_address VARCHAR(64) NOT NULL, - royalty_points_numerator_v1 INT NOT NULL, - royalty_points_denominator_v1 INT NOT NULL, + token_data_id VARCHAR(66) NOT NULL, + payee_address VARCHAR(66) NOT NULL, + royalty_points_numerator NUMERIC NOT NULL, + royalty_points_denominator NUMERIC NOT NULL, token_standard TEXT NOT NULL, - token_data_id PRIMARY KEY + last_transaction_version BIGINT NOT NULL, + last_transaction_timestamp TIMESTAMP NOT NULL, + inserted_at TIMESTAMP NOT NULL DEFAULT NOW(), + PRIMARY KEY (token_data_id) ); diff --git a/rust/processor/src/models/token_v2_models/mod.rs b/rust/processor/src/models/token_v2_models/mod.rs index ab7c4616c..6c5f06a07 100644 --- a/rust/processor/src/models/token_v2_models/mod.rs +++ b/rust/processor/src/models/token_v2_models/mod.rs @@ -6,4 +6,5 @@ pub mod v2_token_activities; pub mod v2_token_datas; pub mod v2_token_metadata; pub mod v2_token_ownerships; +pub mod v2_token_royalty; pub mod v2_token_utils; diff --git a/rust/processor/src/models/token_v2_models/v2_token_royalty.rs b/rust/processor/src/models/token_v2_models/v2_token_royalty.rs new file mode 100644 index 000000000..3522942e5 --- /dev/null +++ b/rust/processor/src/models/token_v2_models/v2_token_royalty.rs @@ -0,0 +1,108 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use super::v2_token_utils::TokenStandard; +use crate::{ + models::token_models::token_utils::TokenWriteSet, + schema::{current_token_royalty, token_royalty}, +}; +use aptos_protos::transaction::v1::WriteTableItem; +use bigdecimal::BigDecimal; +use field_count::FieldCount; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[diesel(primary_key(token_data_id, transaction_version))] +#[diesel(table_name = token_royalty)] +pub struct TokenRoyalty { + pub transaction_version: i64, + pub token_data_id: String, + pub payee_address: String, + pub royalty_points_numerator: BigDecimal, + pub royalty_points_denominator: BigDecimal, + pub token_standard: String, +} + +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[diesel(primary_key(token_data_id))] +#[diesel(table_name = current_token_royalty)] +pub struct CurrentTokenRoyalty { + pub token_data_id: String, + pub payee_address: String, + pub royalty_points_numerator: BigDecimal, + pub royalty_points_denominator: BigDecimal, + pub token_standard: String, + pub last_transaction_version: i64, + pub last_transaction_timestamp: chrono::NaiveDateTime, +} + +impl TokenRoyalty { + // Royalty for v2 token is more complicated and not supported yet. For token v2, royalty can be on the collection (default) or on + // the token (override). + pub fn get_v1_from_write_table_item( + write_table_item: &WriteTableItem, + transaction_version: i64, + transaction_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + let table_item_data = write_table_item.data.as_ref().unwrap(); + + let maybe_token_data = match TokenWriteSet::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + transaction_version, + )? { + Some(TokenWriteSet::TokenData(inner)) => Some(inner), + _ => None, + }; + + if let Some(token_data) = maybe_token_data { + let maybe_token_data_id = match TokenWriteSet::from_table_item_type( + table_item_data.key_type.as_str(), + &table_item_data.key, + transaction_version, + )? { + Some(TokenWriteSet::TokenDataId(inner)) => Some(inner), + _ => None, + }; + if let Some(token_data_id_struct) = maybe_token_data_id { + let token_data_id = token_data_id_struct.to_hash(); + let payee_address = token_data.royalty.get_payee_address(); + let royalty_points_numerator = token_data.royalty.royalty_points_numerator.clone(); + let royalty_points_denominator = + token_data.royalty.royalty_points_denominator.clone(); + + return Ok(Some(( + TokenRoyalty { + transaction_version, + token_data_id: token_data_id.clone(), + payee_address: payee_address.clone(), + royalty_points_numerator: royalty_points_numerator.clone(), + royalty_points_denominator: royalty_points_denominator.clone(), + token_standard: TokenStandard::V1.to_string(), + }, + CurrentTokenRoyalty { + token_data_id, + payee_address, + royalty_points_numerator, + royalty_points_denominator, + token_standard: TokenStandard::V1.to_string(), + last_transaction_version: transaction_version, + last_transaction_timestamp: transaction_timestamp, + }, + ))); + } else { + tracing::warn!( + transaction_version, + key_type = table_item_data.key_type, + key = table_item_data.key, + "Expecting token_data_id as key for value = token_data" + ); + } + } + Ok(None) + } +} diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index 5dc4cc4db..4a02cb4cd 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -19,6 +19,7 @@ use crate::{ CurrentTokenOwnershipV2, CurrentTokenOwnershipV2PK, NFTOwnershipV2, TokenOwnershipV2, }, + v2_token_royalty::{CurrentTokenRoyalty, TokenRoyalty}, v2_token_utils::{ AptosCollection, Burn, BurnEvent, ConcurrentSupply, FixedSupply, MintEvent, PropertyMapModel, TokenIdentifiers, TokenV2, TokenV2Burned, TokenV2Minted, @@ -106,6 +107,7 @@ async fn insert_to_db( ), token_activities_v2: &[TokenActivityV2], current_token_v2_metadata: &[CurrentTokenV2Metadata], + (token_royalties_v2, current_token_royalties_v2): (&[TokenRoyalty], &[CurrentTokenRoyalty]), per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( @@ -191,7 +193,7 @@ async fn insert_to_db( ), ); let ct_v2 = execute_in_chunks( - conn, + conn.clone(), insert_current_token_v2_metadatas_query, current_token_v2_metadata, get_config_table_chunk_size::( @@ -199,6 +201,21 @@ async fn insert_to_db( per_table_chunk_sizes, ), ); + let tr_v2 = execute_in_chunks( + conn.clone(), + insert_token_royalties_query, + token_royalties_v2, + get_config_table_chunk_size::("token_royalty", per_table_chunk_sizes), + ); + let ctr_v2 = execute_in_chunks( + conn, + insert_current_token_royalties_query, + current_token_royalties_v2, + get_config_table_chunk_size::( + "current_token_royalty", + per_table_chunk_sizes, + ), + ); let ( coll_v2_res, @@ -211,7 +228,11 @@ async fn insert_to_db( cdto_v2_res, ta_v2_res, ct_v2_res, - ) = tokio::join!(coll_v2, td_v2, to_v2, cc_v2, ctd_v2, cdtd_v2, cto_v2, cdto_v2, ta_v2, ct_v2,); + tr_v2_res, + ctr_v2_res, + ) = tokio::join!( + coll_v2, td_v2, to_v2, cc_v2, ctd_v2, cdtd_v2, cto_v2, cdto_v2, ta_v2, ct_v2, tr_v2, ctr_v2 + ); for res in [ coll_v2_res, @@ -224,6 +245,8 @@ async fn insert_to_db( cdto_v2_res, ta_v2_res, ct_v2_res, + tr_v2_res, + ctr_v2_res, ] { res?; } @@ -480,6 +503,48 @@ fn insert_current_token_v2_metadatas_query( ) } +fn insert_token_royalties_query( + items_to_insert: Vec, +) -> ( + impl QueryFragment + diesel::query_builder::QueryId + Send, + Option<&'static str>, +) { + use schema::token_royalty::dsl::*; + + ( + diesel::insert_into(schema::token_royalty::table) + .values(items_to_insert) + .on_conflict((transaction_version, token_data_id)) + .do_nothing(), + None, + ) +} + +fn insert_current_token_royalties_query( + items_to_insert: Vec, +) -> ( + impl QueryFragment + diesel::query_builder::QueryId + Send, + Option<&'static str>, +) { + use schema::current_token_royalty::dsl::*; + + ( + diesel::insert_into(schema::current_token_royalty::table) + .values(items_to_insert) + .on_conflict(token_data_id) + .do_update() + .set(( + payee_address.eq(excluded(payee_address)), + royalty_points_numerator.eq(excluded(royalty_points_numerator)), + royalty_points_denominator.eq(excluded(royalty_points_denominator)), + token_standard.eq(excluded(token_standard)), + last_transaction_version.eq(excluded(last_transaction_version)), + last_transaction_timestamp.eq(excluded(last_transaction_timestamp)), + )), + Some(" WHERE current_token_royalty.last_transaction_version <= excluded.last_transaction_version "), + ) +} + #[async_trait] impl ProcessorTrait for TokenV2Processor { fn name(&self) -> &'static str { @@ -517,6 +582,8 @@ impl ProcessorTrait for TokenV2Processor { current_deleted_token_ownerships_v2, token_activities_v2, current_token_v2_metadata, + token_royalties, + current_token_royalties, ) = parse_v2_token( &transactions, &table_handle_to_owner, @@ -545,6 +612,7 @@ impl ProcessorTrait for TokenV2Processor { ), &token_activities_v2, ¤t_token_v2_metadata, + (&token_royalties, ¤t_token_royalties), &self.per_table_chunk_sizes, ) .await; @@ -593,12 +661,16 @@ async fn parse_v2_token( Vec, // deleted token ownerships Vec, Vec, + Vec, + Vec, ) { // Token V2 and V1 combined let mut collections_v2 = vec![]; let mut token_datas_v2 = vec![]; let mut token_ownerships_v2 = vec![]; let mut token_activities_v2 = vec![]; + let mut token_royalties = vec![]; + let mut current_collections_v2: AHashMap = AHashMap::new(); let mut current_token_datas_v2: AHashMap = @@ -619,6 +691,8 @@ async fn parse_v2_token( // Basically token properties let mut current_token_v2_metadata: AHashMap = AHashMap::new(); + let mut current_token_royalties: AHashMap = + AHashMap::new(); // Code above is inefficient (multiple passthroughs) so I'm approaching TokenV2 with a cleaner code structure for txn in transactions { @@ -824,6 +898,20 @@ async fn parse_v2_token( current_token_data, ); } + if let Some((token_royalty, current_token_royalty)) = + TokenRoyalty::get_v1_from_write_table_item( + table_item, + txn_version, + txn_timestamp, + ) + .unwrap() + { + token_royalties.push(token_royalty); + current_token_royalties.insert( + current_token_royalty.token_data_id.clone(), + current_token_royalty, + ); + } if let Some((token_ownership, current_token_ownership)) = TokenOwnershipV2::get_v1_from_write_table_item( table_item, @@ -1075,6 +1163,8 @@ async fn parse_v2_token( } } } + tracing::info!("token royalties {}", token_royalties.len()); + tracing::info!("current token royalties {}", current_token_royalties.len()); // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes let mut current_collections_v2 = current_collections_v2 @@ -1095,6 +1185,9 @@ async fn parse_v2_token( let mut current_deleted_token_ownerships_v2 = current_deleted_token_ownerships_v2 .into_values() .collect::>(); + let mut current_token_royalties = current_token_royalties + .into_values() + .collect::>(); // Sort by PK current_collections_v2.sort_by(|a, b| a.collection_id.cmp(&b.collection_id)); @@ -1131,6 +1224,7 @@ async fn parse_v2_token( &b.storage_id, )) }); + current_token_royalties.sort_by(|a, b| a.token_data_id.cmp(&b.token_data_id)); ( collections_v2, @@ -1143,5 +1237,7 @@ async fn parse_v2_token( current_deleted_token_ownerships_v2, token_activities_v2, current_token_v2_metadata, + token_royalties, + current_token_royalties, ) } diff --git a/rust/processor/src/schema.rs b/rust/processor/src/schema.rs index 45fab0e8f..9a233539a 100644 --- a/rust/processor/src/schema.rs +++ b/rust/processor/src/schema.rs @@ -620,6 +620,21 @@ diesel::table! { } } +diesel::table! { + current_token_royalty (token_data_id) { + #[max_length = 66] + token_data_id -> Varchar, + #[max_length = 66] + payee_address -> Varchar, + royalty_points_numerator -> Numeric, + royalty_points_denominator -> Numeric, + token_standard -> Text, + last_transaction_version -> Int8, + last_transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + } +} + diesel::table! { current_token_v2_metadata (object_address, resource_type) { #[max_length = 66] @@ -1131,6 +1146,20 @@ diesel::table! { } } +diesel::table! { + token_royalty (transaction_version, token_data_id) { + transaction_version -> Int8, + #[max_length = 66] + token_data_id -> Varchar, + #[max_length = 66] + payee_address -> Varchar, + royalty_points_numerator -> Numeric, + royalty_points_denominator -> Numeric, + token_standard -> Text, + inserted_at -> Timestamp, + } +} + diesel::table! { tokens (token_data_id_hash, property_version, transaction_version) { #[max_length = 64] @@ -1265,6 +1294,7 @@ diesel::allow_tables_to_appear_in_same_query!( current_token_ownerships, current_token_ownerships_v2, current_token_pending_claims, + current_token_royalty, current_token_v2_metadata, delegated_staking_activities, delegated_staking_pool_balances, @@ -1293,6 +1323,7 @@ diesel::allow_tables_to_appear_in_same_query!( token_datas_v2, token_ownerships, token_ownerships_v2, + token_royalty, tokens, transaction_size_info, transactions,