Skip to content

Commit

Permalink
[parquet-sdk][token_v2] migrate TokenDataV2 and CurrentTokenDataV2
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Dec 12, 2024
1 parent da99b1b commit c70afcd
Show file tree
Hide file tree
Showing 15 changed files with 537 additions and 398 deletions.
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/token_v2_models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod raw_token_claims;
pub mod raw_v1_token_royalty;
pub mod raw_v2_token_activities;
pub mod raw_v2_token_datas;
pub mod raw_v2_token_metadata;
Original file line number Diff line number Diff line change
Expand Up @@ -6,63 +6,69 @@
#![allow(clippy::unused_unit)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::postgres::models::{
object_models::v2_object_utils::ObjectAggregatedDataMapping,
resources::FromWriteResource,
token_models::token_utils::TokenWriteSet,
token_v2_models::{
v2_token_datas::CurrentTokenDataV2,
v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned},
},
token_v2_models::v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned},
},
utils::util::standardize_address,
};
use allocative_derive::Allocative;
use anyhow::Context;
use aptos_protos::transaction::v1::{DeleteResource, WriteResource, WriteTableItem};
use bigdecimal::ToPrimitive;
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use bigdecimal::BigDecimal;
use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct TokenDataV2 {
pub txn_version: i64,
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct RawTokenDataV2 {
pub transaction_version: i64,
pub write_set_change_index: i64,
pub token_data_id: String,
pub collection_id: String,
pub token_name: String,
pub largest_property_version_v1: Option<u64>,
pub maximum: Option<BigDecimal>,
pub supply: Option<BigDecimal>,
pub largest_property_version_v1: Option<BigDecimal>,
pub token_uri: String,
pub token_properties: String,
pub token_properties: serde_json::Value,
pub description: String,
pub token_standard: String,
pub is_fungible_v2: Option<bool>,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
pub transaction_timestamp: chrono::NaiveDateTime,
// Deprecated, but still here for backwards compatibility
pub decimals: Option<i64>,
// Here for consistency but we don't need to actually fill it
pub is_deleted_v2: Option<bool>,
}

impl NamedTable for TokenDataV2 {
const TABLE_NAME: &'static str = "token_datas_v2";
pub trait TokenDataV2Convertible {
fn from_raw(raw_item: RawTokenDataV2) -> Self;
}

impl HasVersion for TokenDataV2 {
fn version(&self) -> i64 {
self.txn_version
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RawCurrentTokenDataV2 {
pub token_data_id: String,
pub collection_id: String,
pub token_name: String,
pub maximum: Option<BigDecimal>,
pub supply: Option<BigDecimal>,
pub largest_property_version_v1: Option<BigDecimal>,
pub token_uri: String,
pub token_properties: serde_json::Value,
pub description: String,
pub token_standard: String,
pub is_fungible_v2: Option<bool>,
pub last_transaction_version: i64,
pub last_transaction_timestamp: chrono::NaiveDateTime,
// Deprecated, but still here for backwards compatibility
pub decimals: Option<i64>,
pub is_deleted_v2: Option<bool>,
}

impl GetTimeStamp for TokenDataV2 {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
pub trait CurrentTokenDataV2Convertible {
fn from_raw(raw_item: RawCurrentTokenDataV2) -> Self;
}

impl TokenDataV2 {
impl RawTokenDataV2 {
// TODO: remove the useless_asref lint when new clippy nighly is released.
#[allow(clippy::useless_asref)]
pub fn get_v2_from_write_resource(
Expand All @@ -71,7 +77,7 @@ impl TokenDataV2 {
write_set_change_index: i64,
txn_timestamp: chrono::NaiveDateTime,
object_metadatas: &ObjectAggregatedDataMapping,
) -> anyhow::Result<Option<Self>> {
) -> anyhow::Result<Option<(Self, RawCurrentTokenDataV2)>> {
if let Some(inner) = &TokenV2::from_write_resource(write_resource)? {
let token_data_id = standardize_address(&write_resource.address.to_string());
let mut token_name = inner.get_name_trunc();
Expand Down Expand Up @@ -102,22 +108,43 @@ impl TokenDataV2 {
let collection_id = inner.get_collection_address();
let token_uri = inner.get_uri_trunc();

Ok(Some(Self {
txn_version,
write_set_change_index,
token_data_id: token_data_id.clone(),
collection_id: collection_id.clone(),
token_name: token_name.clone(),
largest_property_version_v1: None,
token_uri: token_uri.clone(),
token_properties: canonical_json::to_string(&token_properties.clone())
.context("Failed to serialize token properties")?,
description: inner.description.clone(),
token_standard: TokenStandard::V2.to_string(),
is_fungible_v2,
block_timestamp: txn_timestamp,
is_deleted_v2: None,
}))
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
token_data_id: token_data_id.clone(),
collection_id: collection_id.clone(),
token_name: token_name.clone(),
maximum: None,
supply: None,
largest_property_version_v1: None,
token_uri: token_uri.clone(),
token_properties: token_properties.clone(),
description: inner.description.clone(),
token_standard: TokenStandard::V2.to_string(),
is_fungible_v2,
transaction_timestamp: txn_timestamp,
decimals: None,
is_deleted_v2: None,
},
RawCurrentTokenDataV2 {
token_data_id,
collection_id,
token_name,
maximum: None,
supply: None,
largest_property_version_v1: None,
token_uri,
token_properties,
description: inner.description.clone(),
token_standard: TokenStandard::V2.to_string(),
is_fungible_v2,
last_transaction_version: txn_version,
last_transaction_timestamp: txn_timestamp,
decimals: None,
is_deleted_v2: Some(false),
},
)))
} else {
Ok(None)
}
Expand All @@ -129,11 +156,11 @@ impl TokenDataV2 {
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
tokens_burned: &TokenV2Burned,
) -> anyhow::Result<Option<CurrentTokenDataV2>> {
) -> anyhow::Result<Option<RawCurrentTokenDataV2>> {
let token_data_id = standardize_address(&write_resource.address.to_string());
// reminder that v1 events won't get to this codepath
if let Some(burn_event_v2) = tokens_burned.get(&standardize_address(&token_data_id)) {
Ok(Some(CurrentTokenDataV2 {
Ok(Some(RawCurrentTokenDataV2 {
token_data_id,
collection_id: burn_event_v2.get_collection_address(),
token_name: "".to_string(),
Expand Down Expand Up @@ -161,11 +188,11 @@ impl TokenDataV2 {
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
tokens_burned: &TokenV2Burned,
) -> anyhow::Result<Option<CurrentTokenDataV2>> {
) -> anyhow::Result<Option<RawCurrentTokenDataV2>> {
let token_data_id = standardize_address(&delete_resource.address.to_string());
// reminder that v1 events won't get to this codepath
if let Some(burn_event_v2) = tokens_burned.get(&standardize_address(&token_data_id)) {
Ok(Some(CurrentTokenDataV2 {
Ok(Some(RawCurrentTokenDataV2 {
token_data_id,
collection_id: burn_event_v2.get_collection_address(),
token_name: "".to_string(),
Expand All @@ -192,7 +219,7 @@ impl TokenDataV2 {
txn_version: i64,
write_set_change_index: i64,
txn_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<Self>> {
) -> anyhow::Result<Option<(Self, RawCurrentTokenDataV2)>> {
let table_item_data = table_item.data.as_ref().unwrap();

let maybe_token_data = match TokenWriteSet::from_table_item_type(
Expand All @@ -219,30 +246,45 @@ impl TokenDataV2 {
let token_name = token_data_id_struct.get_name_trunc();
let token_uri = token_data.get_uri_trunc();

return Ok(Some(Self {
txn_version,
write_set_change_index,
token_data_id: token_data_id.clone(),
collection_id: collection_id.clone(),
token_name: token_name.clone(),
largest_property_version_v1: Some(
token_data
.largest_property_version
.clone()
.to_u64()
.unwrap(),
),
token_uri: token_uri.clone(),
token_properties: canonical_json::to_string(
&token_data.default_properties.clone(),
)
.context("Failed to serialize token properties")?,
description: token_data.description.clone(),
token_standard: TokenStandard::V1.to_string(),
is_fungible_v2: None,
block_timestamp: txn_timestamp,
is_deleted_v2: None,
}));
return Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
token_data_id: token_data_id.clone(),
collection_id: collection_id.clone(),
token_name: token_name.clone(),
maximum: Some(token_data.maximum.clone()),
supply: Some(token_data.supply.clone()),
largest_property_version_v1: Some(
token_data.largest_property_version.clone(),
),
token_uri: token_uri.clone(),
token_properties: token_data.default_properties.clone(),
description: token_data.description.clone(),
token_standard: TokenStandard::V1.to_string(),
is_fungible_v2: None,
transaction_timestamp: txn_timestamp,
decimals: None,
is_deleted_v2: None,
},
RawCurrentTokenDataV2 {
token_data_id,
collection_id,
token_name,
maximum: Some(token_data.maximum),
supply: Some(token_data.supply),
largest_property_version_v1: Some(token_data.largest_property_version),
token_uri,
token_properties: token_data.default_properties,
description: token_data.description,
token_standard: TokenStandard::V1.to_string(),
is_fungible_v2: None,
last_transaction_version: txn_version,
last_transaction_timestamp: txn_timestamp,
decimals: None,
is_deleted_v2: None,
},
)));
} else {
tracing::warn!(
transaction_version = txn_version,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod token_claims;
pub mod v1_token_royalty;
pub mod v2_token_activities;
pub mod v2_token_datas;
pub mod v2_token_metadata;

pub mod v2_token_ownerships;
Loading

0 comments on commit c70afcd

Please sign in to comment.