Skip to content

Commit

Permalink
[parquet-sdk][token_v2] migrate CurrentTokenRoyaltyV1
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Dec 13, 2024
1 parent a8fb814 commit b5f6c15
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 91 deletions.
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/token_v2_models/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod raw_token_claims;
pub mod raw_v1_token_royalty;
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::db::postgres::models::token_models::token_utils::TokenWriteSet;
use aptos_protos::transaction::v1::WriteTableItem;
use bigdecimal::BigDecimal;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct RawCurrentTokenRoyaltyV1 {
pub token_data_id: String,
pub payee_address: String,
pub royalty_points_numerator: BigDecimal,
pub royalty_points_denominator: BigDecimal,
pub last_transaction_version: i64,
pub last_transaction_timestamp: chrono::NaiveDateTime,
}

impl Ord for RawCurrentTokenRoyaltyV1 {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.token_data_id.cmp(&other.token_data_id)
}
}
impl PartialOrd for RawCurrentTokenRoyaltyV1 {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl RawCurrentTokenRoyaltyV1 {
pub fn pk(&self) -> String {
self.token_data_id.clone()
}

// Royalty for v2 token is more complicated and not supported yet. For token v2, royalty can be on the collection (default) or on
// the token (override).
pub fn get_v1_from_write_table_item(
write_table_item: &WriteTableItem,
transaction_version: i64,
transaction_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<Self>> {
let table_item_data = write_table_item.data.as_ref().unwrap();

let maybe_token_data = match TokenWriteSet::from_table_item_type(
table_item_data.value_type.as_str(),
&table_item_data.value,
transaction_version,
)? {
Some(TokenWriteSet::TokenData(inner)) => Some(inner),
_ => None,
};

if let Some(token_data) = maybe_token_data {
let maybe_token_data_id = match TokenWriteSet::from_table_item_type(
table_item_data.key_type.as_str(),
&table_item_data.key,
transaction_version,
)? {
Some(TokenWriteSet::TokenDataId(inner)) => Some(inner),
_ => None,
};
if let Some(token_data_id_struct) = maybe_token_data_id {
// token data id is the 0x{hash} version of the creator, collection name, and token name
let token_data_id = token_data_id_struct.to_id();
let payee_address = token_data.royalty.get_payee_address();
let royalty_points_numerator = token_data.royalty.royalty_points_numerator.clone();
let royalty_points_denominator =
token_data.royalty.royalty_points_denominator.clone();

return Ok(Some(Self {
token_data_id,
payee_address,
royalty_points_numerator,
royalty_points_denominator,
last_transaction_version: transaction_version,
last_transaction_timestamp: transaction_timestamp,
}));
} else {
tracing::warn!(
transaction_version,
key_type = table_item_data.key_type,
key = table_item_data.key,
"Expecting token_data_id as key for value = token_data"
);
}
}
Ok(None)
}
}

pub trait CurrentTokenRoyaltyV1Convertible {
fn from_raw(raw_item: RawCurrentTokenRoyaltyV1) -> Self;
}
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod token_claims;
pub mod v1_token_royalty;
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::token_v2_models::raw_v1_token_royalty::{
CurrentTokenRoyaltyV1Convertible, RawCurrentTokenRoyaltyV1,
},
};
use allocative_derive::Allocative;
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct CurrentTokenRoyaltyV1 {
pub token_data_id: String,
pub payee_address: String,
pub royalty_points_numerator: String, // String format of BigDecimal
pub royalty_points_denominator: String, // String format of BigDecimal
pub last_transaction_version: i64,
#[allocative(skip)]
pub last_transaction_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for CurrentTokenRoyaltyV1 {
const TABLE_NAME: &'static str = "current_token_royalty_v1";
}

impl HasVersion for CurrentTokenRoyaltyV1 {
fn version(&self) -> i64 {
self.last_transaction_version
}
}

impl GetTimeStamp for CurrentTokenRoyaltyV1 {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.last_transaction_timestamp
}
}

impl CurrentTokenRoyaltyV1Convertible for CurrentTokenRoyaltyV1 {
// TODO: consider returning a Result
fn from_raw(raw_item: RawCurrentTokenRoyaltyV1) -> Self {
Self {
token_data_id: raw_item.token_data_id,
payee_address: raw_item.payee_address,
royalty_points_numerator: raw_item.royalty_points_numerator.to_string(),
royalty_points_denominator: raw_item.royalty_points_denominator.to_string(),
last_transaction_version: raw_item.last_transaction_version,
last_transaction_timestamp: raw_item.last_transaction_timestamp,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
#![allow(clippy::unused_unit)]

use crate::{
db::postgres::models::token_models::token_utils::TokenWriteSet,
db::common::models::token_v2_models::raw_v1_token_royalty::{
CurrentTokenRoyaltyV1Convertible, RawCurrentTokenRoyaltyV1,
},
schema::current_token_royalty_v1,
};
use aptos_protos::transaction::v1::WriteTableItem;
use bigdecimal::BigDecimal;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -39,63 +40,15 @@ impl PartialOrd for CurrentTokenRoyaltyV1 {
}
}

impl CurrentTokenRoyaltyV1 {
pub fn pk(&self) -> String {
self.token_data_id.clone()
}

// Royalty for v2 token is more complicated and not supported yet. For token v2, royalty can be on the collection (default) or on
// the token (override).
pub fn get_v1_from_write_table_item(
write_table_item: &WriteTableItem,
transaction_version: i64,
transaction_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<Self>> {
let table_item_data = write_table_item.data.as_ref().unwrap();

let maybe_token_data = match TokenWriteSet::from_table_item_type(
table_item_data.value_type.as_str(),
&table_item_data.value,
transaction_version,
)? {
Some(TokenWriteSet::TokenData(inner)) => Some(inner),
_ => None,
};

if let Some(token_data) = maybe_token_data {
let maybe_token_data_id = match TokenWriteSet::from_table_item_type(
table_item_data.key_type.as_str(),
&table_item_data.key,
transaction_version,
)? {
Some(TokenWriteSet::TokenDataId(inner)) => Some(inner),
_ => None,
};
if let Some(token_data_id_struct) = maybe_token_data_id {
// token data id is the 0x{hash} version of the creator, collection name, and token name
let token_data_id = token_data_id_struct.to_id();
let payee_address = token_data.royalty.get_payee_address();
let royalty_points_numerator = token_data.royalty.royalty_points_numerator.clone();
let royalty_points_denominator =
token_data.royalty.royalty_points_denominator.clone();

return Ok(Some(Self {
token_data_id,
payee_address,
royalty_points_numerator,
royalty_points_denominator,
last_transaction_version: transaction_version,
last_transaction_timestamp: transaction_timestamp,
}));
} else {
tracing::warn!(
transaction_version,
key_type = table_item_data.key_type,
key = table_item_data.key,
"Expecting token_data_id as key for value = token_data"
);
}
impl CurrentTokenRoyaltyV1Convertible for CurrentTokenRoyaltyV1 {
fn from_raw(raw_item: RawCurrentTokenRoyaltyV1) -> Self {
Self {
token_data_id: raw_item.token_data_id,
payee_address: raw_item.payee_address,
royalty_points_numerator: raw_item.royalty_points_numerator,
royalty_points_denominator: raw_item.royalty_points_denominator,
last_transaction_version: raw_item.last_transaction_version,
last_transaction_timestamp: raw_item.last_transaction_timestamp,
}
Ok(None)
}
}
25 changes: 17 additions & 8 deletions rust/processor/src/processors/token_v2_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait};
use crate::{
db::{
common::models::token_v2_models::raw_token_claims::{
CurrentTokenPendingClaimConvertible, RawCurrentTokenPendingClaim, TokenV1Claimed,
common::models::token_v2_models::{
raw_token_claims::{
CurrentTokenPendingClaimConvertible, RawCurrentTokenPendingClaim, TokenV1Claimed,
},
raw_v1_token_royalty::{CurrentTokenRoyaltyV1Convertible, RawCurrentTokenRoyaltyV1},
},
postgres::models::{
fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata,
Expand Down Expand Up @@ -617,7 +620,7 @@ impl ProcessorTrait for TokenV2Processor {
current_deleted_token_ownerships_v2,
token_activities_v2,
mut current_token_v2_metadata,
current_token_royalties_v1,
raw_current_token_royalties_v1,
raw_current_token_claims,
) = parse_v2_token(
&transactions,
Expand All @@ -633,6 +636,12 @@ impl ProcessorTrait for TokenV2Processor {
.map(CurrentTokenPendingClaim::from_raw)
.collect();

let postgres_current_token_royalties_v1: Vec<CurrentTokenRoyaltyV1> =
raw_current_token_royalties_v1
.into_iter()
.map(CurrentTokenRoyaltyV1::from_raw)
.collect();

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

Expand Down Expand Up @@ -671,7 +680,7 @@ impl ProcessorTrait for TokenV2Processor {
),
&token_activities_v2,
&current_token_v2_metadata,
&current_token_royalties_v1,
&postgres_current_token_royalties_v1,
&postgres_current_token_claims,
&self.per_table_chunk_sizes,
)
Expand Down Expand Up @@ -723,7 +732,7 @@ pub async fn parse_v2_token(
Vec<CurrentTokenOwnershipV2>, // deleted token ownerships
Vec<TokenActivityV2>,
Vec<CurrentTokenV2Metadata>,
Vec<CurrentTokenRoyaltyV1>,
Vec<RawCurrentTokenRoyaltyV1>,
Vec<RawCurrentTokenPendingClaim>,
) {
// Token V2 and V1 combined
Expand Down Expand Up @@ -752,7 +761,7 @@ pub async fn parse_v2_token(
// Basically token properties
let mut current_token_v2_metadata: AHashMap<CurrentTokenV2MetadataPK, CurrentTokenV2Metadata> =
AHashMap::new();
let mut current_token_royalties_v1: AHashMap<CurrentTokenDataV2PK, CurrentTokenRoyaltyV1> =
let mut current_token_royalties_v1: AHashMap<CurrentTokenDataV2PK, RawCurrentTokenRoyaltyV1> =
AHashMap::new();
// migrating this from v1 token model as we don't have any replacement table for this
let mut all_current_token_claims: AHashMap<
Expand Down Expand Up @@ -967,7 +976,7 @@ pub async fn parse_v2_token(
);
}
if let Some(current_token_royalty) =
CurrentTokenRoyaltyV1::get_v1_from_write_table_item(
RawCurrentTokenRoyaltyV1::get_v1_from_write_table_item(
table_item,
txn_version,
txn_timestamp,
Expand Down Expand Up @@ -1294,7 +1303,7 @@ pub async fn parse_v2_token(
.collect::<Vec<CurrentTokenOwnershipV2>>();
let mut current_token_royalties_v1 = current_token_royalties_v1
.into_values()
.collect::<Vec<CurrentTokenRoyaltyV1>>();
.collect::<Vec<RawCurrentTokenRoyaltyV1>>();
let mut all_current_token_claims = all_current_token_claims
.into_values()
.collect::<Vec<RawCurrentTokenPendingClaim>>();
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/utils/table_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ bitflags! {
const COLLECTIONS_V2 = 1 << 57;
const TOKEN_OWNERSHIPS_V2 = 1 << 58;
const TOKEN_DATAS_V2 = 1 << 59;
const CURRENT_TOKEN_ROYALTY_V1 = 1 << 60;

// User Transactions and Signatures: 61-70
const USER_TRANSACTIONS = 1 << 61;
Expand Down
11 changes: 7 additions & 4 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use processor::{
},
parquet_v2_fungible_metadata::FungibleAssetMetadataModel,
},
token_v2_models::token_claims::CurrentTokenPendingClaim,
token_v2_models::{
token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1,
},
transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
user_transaction_models::parquet_user_transactions::UserTransaction,
},
Expand Down Expand Up @@ -166,9 +168,10 @@ impl ProcessorConfig {
ProcessorName::ParquetAccountTransactionsProcessor => {
HashSet::from([AccountTransaction::TABLE_NAME.to_string()])
},
ProcessorName::ParquetTokenV2Processor => {
HashSet::from([CurrentTokenPendingClaim::TABLE_NAME.to_string()])
},
ProcessorName::ParquetTokenV2Processor => HashSet::from([
CurrentTokenPendingClaim::TABLE_NAME.to_string(),
CurrentTokenRoyaltyV1::TABLE_NAME.to_string(),
]),
_ => HashSet::new(), // Default case for unsupported processors
}
}
Expand Down
Loading

0 comments on commit b5f6c15

Please sign in to comment.