Skip to content

Commit

Permalink
Index royalty v1 in its own table
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso committed May 22, 2024
1 parent 93d1fa7 commit b5dca67
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 12 deletions.
24 changes: 14 additions & 10 deletions rust/processor/migrations/2024-05-21-221101_add_royalty_v1/up.sql
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
-- Your SQL goes here
CREATE TABLE token_royalty (
transaction_version BIGINT NOT NULL,
token_data_id VARCHAR(64) NOT NULL,
payee_address VARCHAR(64) NOT NULL,
royalty_points_numerator_v1 INT NOT NULL,
royalty_points_denominator_v1 INT NOT NULL,
token_data_id VARCHAR(66) NOT NULL,
payee_address VARCHAR(66) NOT NULL,
royalty_points_numerator NUMERIC NOT NULL,
royalty_points_denominator NUMERIC NOT NULL,
token_standard TEXT NOT NULL,
(transaction_version, token_data_id) PRIMARY KEY
inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
PRIMARY KEY (transaction_version, token_data_id)
);

CREATE TABLE current_token_royalty (
token_data_id VARCHAR(64) NOT NULL,
payee_address VARCHAR(64) NOT NULL,
royalty_points_numerator_v1 INT NOT NULL,
royalty_points_denominator_v1 INT NOT NULL,
token_data_id VARCHAR(66) NOT NULL,
payee_address VARCHAR(66) NOT NULL,
royalty_points_numerator NUMERIC NOT NULL,
royalty_points_denominator NUMERIC NOT NULL,
token_standard TEXT NOT NULL,
token_data_id PRIMARY KEY
last_transaction_version BIGINT NOT NULL,
last_transaction_timestamp TIMESTAMP NOT NULL,
inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
PRIMARY KEY (token_data_id)
);
1 change: 1 addition & 0 deletions rust/processor/src/models/token_v2_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pub mod v2_token_activities;
pub mod v2_token_datas;
pub mod v2_token_metadata;
pub mod v2_token_ownerships;
pub mod v2_token_royalty;
pub mod v2_token_utils;
108 changes: 108 additions & 0 deletions rust/processor/src/models/token_v2_models/v2_token_royalty.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use super::v2_token_utils::TokenStandard;
use crate::{
models::token_models::token_utils::TokenWriteSet,
schema::{current_token_royalty, token_royalty},
};
use aptos_protos::transaction::v1::WriteTableItem;
use bigdecimal::BigDecimal;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(token_data_id, transaction_version))]
#[diesel(table_name = token_royalty)]
pub struct TokenRoyalty {
pub transaction_version: i64,
pub token_data_id: String,
pub payee_address: String,
pub royalty_points_numerator: BigDecimal,
pub royalty_points_denominator: BigDecimal,
pub token_standard: String,
}

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(token_data_id))]
#[diesel(table_name = current_token_royalty)]
pub struct CurrentTokenRoyalty {
pub token_data_id: String,
pub payee_address: String,
pub royalty_points_numerator: BigDecimal,
pub royalty_points_denominator: BigDecimal,
pub token_standard: String,
pub last_transaction_version: i64,
pub last_transaction_timestamp: chrono::NaiveDateTime,
}

impl TokenRoyalty {
// Royalty for v2 token is more complicated and not supported yet. For token v2, royalty can be on the collection (default) or on
// the token (override).
pub fn get_v1_from_write_table_item(
write_table_item: &WriteTableItem,
transaction_version: i64,
transaction_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, CurrentTokenRoyalty)>> {
let table_item_data = write_table_item.data.as_ref().unwrap();

let maybe_token_data = match TokenWriteSet::from_table_item_type(
table_item_data.value_type.as_str(),
&table_item_data.value,
transaction_version,
)? {
Some(TokenWriteSet::TokenData(inner)) => Some(inner),
_ => None,
};

if let Some(token_data) = maybe_token_data {
let maybe_token_data_id = match TokenWriteSet::from_table_item_type(
table_item_data.key_type.as_str(),
&table_item_data.key,
transaction_version,
)? {
Some(TokenWriteSet::TokenDataId(inner)) => Some(inner),
_ => None,
};
if let Some(token_data_id_struct) = maybe_token_data_id {
let token_data_id = token_data_id_struct.to_hash();
let payee_address = token_data.royalty.get_payee_address();
let royalty_points_numerator = token_data.royalty.royalty_points_numerator.clone();
let royalty_points_denominator =
token_data.royalty.royalty_points_denominator.clone();

return Ok(Some((
TokenRoyalty {
transaction_version,
token_data_id: token_data_id.clone(),
payee_address: payee_address.clone(),
royalty_points_numerator: royalty_points_numerator.clone(),
royalty_points_denominator: royalty_points_denominator.clone(),
token_standard: TokenStandard::V1.to_string(),
},
CurrentTokenRoyalty {
token_data_id,
payee_address,
royalty_points_numerator,
royalty_points_denominator,
token_standard: TokenStandard::V1.to_string(),
last_transaction_version: transaction_version,
last_transaction_timestamp: transaction_timestamp,
},
)));
} else {
tracing::warn!(
transaction_version,
key_type = table_item_data.key_type,
key = table_item_data.key,
"Expecting token_data_id as key for value = token_data"
);
}
}
Ok(None)
}
}
100 changes: 98 additions & 2 deletions rust/processor/src/processors/token_v2_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
CurrentTokenOwnershipV2, CurrentTokenOwnershipV2PK, NFTOwnershipV2,
TokenOwnershipV2,
},
v2_token_royalty::{CurrentTokenRoyalty, TokenRoyalty},
v2_token_utils::{
AptosCollection, Burn, BurnEvent, ConcurrentSupply, FixedSupply, MintEvent,
PropertyMapModel, TokenIdentifiers, TokenV2, TokenV2Burned, TokenV2Minted,
Expand Down Expand Up @@ -106,6 +107,7 @@ async fn insert_to_db(
),
token_activities_v2: &[TokenActivityV2],
current_token_v2_metadata: &[CurrentTokenV2Metadata],
(token_royalties_v2, current_token_royalties_v2): (&[TokenRoyalty], &[CurrentTokenRoyalty]),
per_table_chunk_sizes: &AHashMap<String, usize>,
) -> Result<(), diesel::result::Error> {
tracing::trace!(
Expand Down Expand Up @@ -191,14 +193,29 @@ async fn insert_to_db(
),
);
let ct_v2 = execute_in_chunks(
conn,
conn.clone(),
insert_current_token_v2_metadatas_query,
current_token_v2_metadata,
get_config_table_chunk_size::<CurrentTokenV2Metadata>(
"current_token_v2_metadata",
per_table_chunk_sizes,
),
);
let tr_v2 = execute_in_chunks(
conn.clone(),
insert_token_royalties_query,
token_royalties_v2,
get_config_table_chunk_size::<TokenRoyalty>("token_royalty", per_table_chunk_sizes),
);
let ctr_v2 = execute_in_chunks(
conn,
insert_current_token_royalties_query,
current_token_royalties_v2,
get_config_table_chunk_size::<CurrentTokenRoyalty>(
"current_token_royalty",
per_table_chunk_sizes,
),
);

let (
coll_v2_res,
Expand All @@ -211,7 +228,11 @@ async fn insert_to_db(
cdto_v2_res,
ta_v2_res,
ct_v2_res,
) = tokio::join!(coll_v2, td_v2, to_v2, cc_v2, ctd_v2, cdtd_v2, cto_v2, cdto_v2, ta_v2, ct_v2,);
tr_v2_res,
ctr_v2_res,
) = tokio::join!(
coll_v2, td_v2, to_v2, cc_v2, ctd_v2, cdtd_v2, cto_v2, cdto_v2, ta_v2, ct_v2, tr_v2, ctr_v2
);

for res in [
coll_v2_res,
Expand All @@ -224,6 +245,8 @@ async fn insert_to_db(
cdto_v2_res,
ta_v2_res,
ct_v2_res,
tr_v2_res,
ctr_v2_res,
] {
res?;
}
Expand Down Expand Up @@ -480,6 +503,48 @@ fn insert_current_token_v2_metadatas_query(
)
}

fn insert_token_royalties_query(
items_to_insert: Vec<TokenRoyalty>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Option<&'static str>,
) {
use schema::token_royalty::dsl::*;

(
diesel::insert_into(schema::token_royalty::table)
.values(items_to_insert)
.on_conflict((transaction_version, token_data_id))
.do_nothing(),
None,
)
}

fn insert_current_token_royalties_query(
items_to_insert: Vec<CurrentTokenRoyalty>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Option<&'static str>,
) {
use schema::current_token_royalty::dsl::*;

(
diesel::insert_into(schema::current_token_royalty::table)
.values(items_to_insert)
.on_conflict(token_data_id)
.do_update()
.set((
payee_address.eq(excluded(payee_address)),
royalty_points_numerator.eq(excluded(royalty_points_numerator)),
royalty_points_denominator.eq(excluded(royalty_points_denominator)),
token_standard.eq(excluded(token_standard)),
last_transaction_version.eq(excluded(last_transaction_version)),
last_transaction_timestamp.eq(excluded(last_transaction_timestamp)),
)),
Some(" WHERE current_token_royalty.last_transaction_version <= excluded.last_transaction_version "),
)
}

#[async_trait]
impl ProcessorTrait for TokenV2Processor {
fn name(&self) -> &'static str {
Expand Down Expand Up @@ -517,6 +582,8 @@ impl ProcessorTrait for TokenV2Processor {
current_deleted_token_ownerships_v2,
token_activities_v2,
current_token_v2_metadata,
token_royalties,
current_token_royalties,
) = parse_v2_token(
&transactions,
&table_handle_to_owner,
Expand Down Expand Up @@ -545,6 +612,7 @@ impl ProcessorTrait for TokenV2Processor {
),
&token_activities_v2,
&current_token_v2_metadata,
(&token_royalties, &current_token_royalties),
&self.per_table_chunk_sizes,
)
.await;
Expand Down Expand Up @@ -593,12 +661,16 @@ async fn parse_v2_token(
Vec<CurrentTokenOwnershipV2>, // deleted token ownerships
Vec<TokenActivityV2>,
Vec<CurrentTokenV2Metadata>,
Vec<TokenRoyalty>,
Vec<CurrentTokenRoyalty>,
) {
// Token V2 and V1 combined
let mut collections_v2 = vec![];
let mut token_datas_v2 = vec![];
let mut token_ownerships_v2 = vec![];
let mut token_activities_v2 = vec![];
let mut token_royalties = vec![];

let mut current_collections_v2: AHashMap<CurrentCollectionV2PK, CurrentCollectionV2> =
AHashMap::new();
let mut current_token_datas_v2: AHashMap<CurrentTokenDataV2PK, CurrentTokenDataV2> =
Expand All @@ -619,6 +691,8 @@ async fn parse_v2_token(
// Basically token properties
let mut current_token_v2_metadata: AHashMap<CurrentTokenV2MetadataPK, CurrentTokenV2Metadata> =
AHashMap::new();
let mut current_token_royalties: AHashMap<CurrentTokenDataV2PK, CurrentTokenRoyalty> =
AHashMap::new();

// Code above is inefficient (multiple passthroughs) so I'm approaching TokenV2 with a cleaner code structure
for txn in transactions {
Expand Down Expand Up @@ -824,6 +898,20 @@ async fn parse_v2_token(
current_token_data,
);
}
if let Some((token_royalty, current_token_royalty)) =
TokenRoyalty::get_v1_from_write_table_item(
table_item,
txn_version,
txn_timestamp,
)
.unwrap()
{
token_royalties.push(token_royalty);
current_token_royalties.insert(
current_token_royalty.token_data_id.clone(),
current_token_royalty,
);
}
if let Some((token_ownership, current_token_ownership)) =
TokenOwnershipV2::get_v1_from_write_table_item(
table_item,
Expand Down Expand Up @@ -1075,6 +1163,8 @@ async fn parse_v2_token(
}
}
}
tracing::info!("token royalties {}", token_royalties.len());
tracing::info!("current token royalties {}", current_token_royalties.len());

// Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes
let mut current_collections_v2 = current_collections_v2
Expand All @@ -1095,6 +1185,9 @@ async fn parse_v2_token(
let mut current_deleted_token_ownerships_v2 = current_deleted_token_ownerships_v2
.into_values()
.collect::<Vec<CurrentTokenOwnershipV2>>();
let mut current_token_royalties = current_token_royalties
.into_values()
.collect::<Vec<CurrentTokenRoyalty>>();

// Sort by PK
current_collections_v2.sort_by(|a, b| a.collection_id.cmp(&b.collection_id));
Expand Down Expand Up @@ -1131,6 +1224,7 @@ async fn parse_v2_token(
&b.storage_id,
))
});
current_token_royalties.sort_by(|a, b| a.token_data_id.cmp(&b.token_data_id));

(
collections_v2,
Expand All @@ -1143,5 +1237,7 @@ async fn parse_v2_token(
current_deleted_token_ownerships_v2,
token_activities_v2,
current_token_v2_metadata,
token_royalties,
current_token_royalties,
)
}
Loading

0 comments on commit b5dca67

Please sign in to comment.