From 69d5c9bd5366f0c8528cb65001b7288f102629a8 Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Tue, 10 Dec 2024 11:28:25 -0800 Subject: [PATCH] [parquet-sdk][token_v2] migrate TokenDataV2 and CurrentTokenDataV2 --- .../db/common/models/token_v2_models/mod.rs | 1 + .../token_v2_models/raw_v2_token_datas.rs} | 194 ++++++++----- .../db/parquet/models/token_v2_models/mod.rs | 2 +- .../models/token_v2_models/v2_token_datas.rs | 140 +++++++++ .../token_v2_models/v2_token_ownerships.rs | 12 +- .../db/postgres/models/token_v2_models/mod.rs | 4 - .../models/token_v2_models/v2_token_datas.rs | 273 +++--------------- .../src/processors/nft_metadata_processor.rs | 48 +-- .../parquet_token_v2_processor.rs | 57 ++-- .../src/processors/token_v2_processor.rs | 63 ++-- .../src/config/processor_config.rs | 9 +- .../src/parquet_processors/mod.rs | 29 +- .../parquet_token_v2_processor.rs | 16 +- .../parquet_token_v2_extractor.rs | 58 +++- .../token_v2_processor/token_v2_extractor.rs | 29 +- 15 files changed, 537 insertions(+), 398 deletions(-) rename rust/processor/src/db/{postgres/models/token_v2_models/parquet_v2_token_datas.rs => common/models/token_v2_models/raw_v2_token_datas.rs} (58%) create mode 100644 rust/processor/src/db/parquet/models/token_v2_models/v2_token_datas.rs diff --git a/rust/processor/src/db/common/models/token_v2_models/mod.rs b/rust/processor/src/db/common/models/token_v2_models/mod.rs index 9046023e2..b8ae30ccb 100644 --- a/rust/processor/src/db/common/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/common/models/token_v2_models/mod.rs @@ -1,4 +1,5 @@ pub mod raw_token_claims; pub mod raw_v1_token_royalty; pub mod raw_v2_token_activities; +pub mod raw_v2_token_datas; pub mod raw_v2_token_metadata; diff --git a/rust/processor/src/db/postgres/models/token_v2_models/parquet_v2_token_datas.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs similarity index 58% rename from rust/processor/src/db/postgres/models/token_v2_models/parquet_v2_token_datas.rs rename to rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs index 300abd189..8b3c85018 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/parquet_v2_token_datas.rs +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs @@ -6,63 +6,69 @@ #![allow(clippy::unused_unit)] use crate::{ - bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, db::postgres::models::{ object_models::v2_object_utils::ObjectAggregatedDataMapping, resources::FromWriteResource, token_models::token_utils::TokenWriteSet, - token_v2_models::{ - v2_token_datas::CurrentTokenDataV2, - v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned}, - }, + token_v2_models::v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned}, }, utils::util::standardize_address, }; -use allocative_derive::Allocative; -use anyhow::Context; use aptos_protos::transaction::v1::{DeleteResource, WriteResource, WriteTableItem}; -use bigdecimal::ToPrimitive; -use field_count::FieldCount; -use parquet_derive::ParquetRecordWriter; +use bigdecimal::BigDecimal; use serde::{Deserialize, Serialize}; -#[derive( - Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, -)] -pub struct TokenDataV2 { - pub txn_version: i64, +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct RawTokenDataV2 { + pub transaction_version: i64, pub write_set_change_index: i64, pub token_data_id: String, pub collection_id: String, pub token_name: String, - pub largest_property_version_v1: Option, + pub maximum: Option, + pub supply: Option, + pub largest_property_version_v1: Option, pub token_uri: String, - pub token_properties: String, + pub token_properties: serde_json::Value, pub description: String, pub token_standard: String, pub is_fungible_v2: Option, - #[allocative(skip)] - pub block_timestamp: chrono::NaiveDateTime, + pub transaction_timestamp: chrono::NaiveDateTime, + // Deprecated, but still here for backwards compatibility + pub decimals: Option, + // Here for consistency but we don't need to actually fill it pub is_deleted_v2: Option, } -impl NamedTable for TokenDataV2 { - const TABLE_NAME: &'static str = "token_datas_v2"; +pub trait TokenDataV2Convertible { + fn from_raw(raw_item: RawTokenDataV2) -> Self; } -impl HasVersion for TokenDataV2 { - fn version(&self) -> i64 { - self.txn_version - } +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RawCurrentTokenDataV2 { + pub token_data_id: String, + pub collection_id: String, + pub token_name: String, + pub maximum: Option, + pub supply: Option, + pub largest_property_version_v1: Option, + pub token_uri: String, + pub token_properties: serde_json::Value, + pub description: String, + pub token_standard: String, + pub is_fungible_v2: Option, + pub last_transaction_version: i64, + pub last_transaction_timestamp: chrono::NaiveDateTime, + // Deprecated, but still here for backwards compatibility + pub decimals: Option, + pub is_deleted_v2: Option, } -impl GetTimeStamp for TokenDataV2 { - fn get_timestamp(&self) -> chrono::NaiveDateTime { - self.block_timestamp - } +pub trait CurrentTokenDataV2Convertible { + fn from_raw(raw_item: RawCurrentTokenDataV2) -> Self; } -impl TokenDataV2 { +impl RawTokenDataV2 { // TODO: remove the useless_asref lint when new clippy nighly is released. #[allow(clippy::useless_asref)] pub fn get_v2_from_write_resource( @@ -71,7 +77,7 @@ impl TokenDataV2 { write_set_change_index: i64, txn_timestamp: chrono::NaiveDateTime, object_metadatas: &ObjectAggregatedDataMapping, - ) -> anyhow::Result> { + ) -> anyhow::Result> { if let Some(inner) = &TokenV2::from_write_resource(write_resource)? { let token_data_id = standardize_address(&write_resource.address.to_string()); let mut token_name = inner.get_name_trunc(); @@ -102,22 +108,43 @@ impl TokenDataV2 { let collection_id = inner.get_collection_address(); let token_uri = inner.get_uri_trunc(); - Ok(Some(Self { - txn_version, - write_set_change_index, - token_data_id: token_data_id.clone(), - collection_id: collection_id.clone(), - token_name: token_name.clone(), - largest_property_version_v1: None, - token_uri: token_uri.clone(), - token_properties: canonical_json::to_string(&token_properties.clone()) - .context("Failed to serialize token properties")?, - description: inner.description.clone(), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2, - block_timestamp: txn_timestamp, - is_deleted_v2: None, - })) + Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + token_data_id: token_data_id.clone(), + collection_id: collection_id.clone(), + token_name: token_name.clone(), + maximum: None, + supply: None, + largest_property_version_v1: None, + token_uri: token_uri.clone(), + token_properties: token_properties.clone(), + description: inner.description.clone(), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2, + transaction_timestamp: txn_timestamp, + decimals: None, + is_deleted_v2: None, + }, + RawCurrentTokenDataV2 { + token_data_id, + collection_id, + token_name, + maximum: None, + supply: None, + largest_property_version_v1: None, + token_uri, + token_properties, + description: inner.description.clone(), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2, + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + decimals: None, + is_deleted_v2: Some(false), + }, + ))) } else { Ok(None) } @@ -129,11 +156,11 @@ impl TokenDataV2 { txn_version: i64, txn_timestamp: chrono::NaiveDateTime, tokens_burned: &TokenV2Burned, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let token_data_id = standardize_address(&write_resource.address.to_string()); // reminder that v1 events won't get to this codepath if let Some(burn_event_v2) = tokens_burned.get(&standardize_address(&token_data_id)) { - Ok(Some(CurrentTokenDataV2 { + Ok(Some(RawCurrentTokenDataV2 { token_data_id, collection_id: burn_event_v2.get_collection_address(), token_name: "".to_string(), @@ -161,11 +188,11 @@ impl TokenDataV2 { txn_version: i64, txn_timestamp: chrono::NaiveDateTime, tokens_burned: &TokenV2Burned, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let token_data_id = standardize_address(&delete_resource.address.to_string()); // reminder that v1 events won't get to this codepath if let Some(burn_event_v2) = tokens_burned.get(&standardize_address(&token_data_id)) { - Ok(Some(CurrentTokenDataV2 { + Ok(Some(RawCurrentTokenDataV2 { token_data_id, collection_id: burn_event_v2.get_collection_address(), token_name: "".to_string(), @@ -192,7 +219,7 @@ impl TokenDataV2 { txn_version: i64, write_set_change_index: i64, txn_timestamp: chrono::NaiveDateTime, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let table_item_data = table_item.data.as_ref().unwrap(); let maybe_token_data = match TokenWriteSet::from_table_item_type( @@ -219,30 +246,45 @@ impl TokenDataV2 { let token_name = token_data_id_struct.get_name_trunc(); let token_uri = token_data.get_uri_trunc(); - return Ok(Some(Self { - txn_version, - write_set_change_index, - token_data_id: token_data_id.clone(), - collection_id: collection_id.clone(), - token_name: token_name.clone(), - largest_property_version_v1: Some( - token_data - .largest_property_version - .clone() - .to_u64() - .unwrap(), - ), - token_uri: token_uri.clone(), - token_properties: canonical_json::to_string( - &token_data.default_properties.clone(), - ) - .context("Failed to serialize token properties")?, - description: token_data.description.clone(), - token_standard: TokenStandard::V1.to_string(), - is_fungible_v2: None, - block_timestamp: txn_timestamp, - is_deleted_v2: None, - })); + return Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + token_data_id: token_data_id.clone(), + collection_id: collection_id.clone(), + token_name: token_name.clone(), + maximum: Some(token_data.maximum.clone()), + supply: Some(token_data.supply.clone()), + largest_property_version_v1: Some( + token_data.largest_property_version.clone(), + ), + token_uri: token_uri.clone(), + token_properties: token_data.default_properties.clone(), + description: token_data.description.clone(), + token_standard: TokenStandard::V1.to_string(), + is_fungible_v2: None, + transaction_timestamp: txn_timestamp, + decimals: None, + is_deleted_v2: None, + }, + RawCurrentTokenDataV2 { + token_data_id, + collection_id, + token_name, + maximum: Some(token_data.maximum), + supply: Some(token_data.supply), + largest_property_version_v1: Some(token_data.largest_property_version), + token_uri, + token_properties: token_data.default_properties, + description: token_data.description, + token_standard: TokenStandard::V1.to_string(), + is_fungible_v2: None, + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + decimals: None, + is_deleted_v2: None, + }, + ))); } else { tracing::warn!( transaction_version = txn_version, diff --git a/rust/processor/src/db/parquet/models/token_v2_models/mod.rs b/rust/processor/src/db/parquet/models/token_v2_models/mod.rs index 0d1a81185..1a959578f 100644 --- a/rust/processor/src/db/parquet/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/parquet/models/token_v2_models/mod.rs @@ -1,6 +1,6 @@ pub mod token_claims; pub mod v1_token_royalty; pub mod v2_token_activities; +pub mod v2_token_datas; pub mod v2_token_metadata; - pub mod v2_token_ownerships; diff --git a/rust/processor/src/db/parquet/models/token_v2_models/v2_token_datas.rs b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_datas.rs new file mode 100644 index 000000000..9c015c35a --- /dev/null +++ b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_datas.rs @@ -0,0 +1,140 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::token_v2_models::raw_v2_token_datas::{ + CurrentTokenDataV2Convertible, RawCurrentTokenDataV2, RawTokenDataV2, + TokenDataV2Convertible, + }, +}; +use allocative_derive::Allocative; +use anyhow::Context; +use bigdecimal::ToPrimitive; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive(Allocative, Clone, Debug, Default, Deserialize, ParquetRecordWriter, Serialize)] +pub struct TokenDataV2 { + pub txn_version: i64, + pub write_set_change_index: i64, + pub token_data_id: String, + pub collection_id: String, + pub token_name: String, + pub largest_property_version_v1: Option, + pub token_uri: String, + pub token_properties: String, + pub description: String, + pub token_standard: String, + pub is_fungible_v2: Option, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, + pub is_deleted_v2: Option, +} + +impl NamedTable for TokenDataV2 { + const TABLE_NAME: &'static str = "token_datas_v2"; +} + +impl HasVersion for TokenDataV2 { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for TokenDataV2 { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl TokenDataV2Convertible for TokenDataV2 { + fn from_raw(raw_item: RawTokenDataV2) -> Self { + Self { + txn_version: raw_item.transaction_version, + write_set_change_index: raw_item.write_set_change_index, + token_data_id: raw_item.token_data_id, + collection_id: raw_item.collection_id, + token_name: raw_item.token_name, + largest_property_version_v1: Some( + raw_item + .largest_property_version_v1 + .unwrap() + .to_u64() + .unwrap(), + ), + token_uri: raw_item.token_uri, + token_properties: canonical_json::to_string(&raw_item.token_properties.clone()) + .context("Failed to serialize token properties") + .unwrap(), + description: raw_item.description, + token_standard: raw_item.token_standard, + is_fungible_v2: raw_item.is_fungible_v2, + block_timestamp: raw_item.transaction_timestamp, + is_deleted_v2: raw_item.is_deleted_v2, + } + } +} + +#[derive(Allocative, Clone, Debug, Default, Deserialize, ParquetRecordWriter, Serialize)] +pub struct CurrentTokenDataV2 { + pub token_data_id: String, + pub collection_id: String, + pub token_name: String, + pub maximum: Option, // BigDecimal + pub supply: Option, + pub largest_property_version_v1: Option, + pub token_uri: String, + pub token_properties: String, // serde_json::Value, + pub description: String, + pub token_standard: String, + pub is_fungible_v2: Option, + pub last_transaction_version: i64, + #[allocative(skip)] + pub last_transaction_timestamp: chrono::NaiveDateTime, + // Deprecated, but still here for backwards compatibility + pub decimals: Option, + pub is_deleted_v2: Option, +} + +impl NamedTable for CurrentTokenDataV2 { + const TABLE_NAME: &'static str = "current_token_datas_v2"; +} + +impl HasVersion for CurrentTokenDataV2 { + fn version(&self) -> i64 { + self.last_transaction_version + } +} + +impl GetTimeStamp for CurrentTokenDataV2 { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.last_transaction_timestamp + } +} + +impl CurrentTokenDataV2Convertible for CurrentTokenDataV2 { + fn from_raw(raw_item: RawCurrentTokenDataV2) -> Self { + Self { + token_data_id: raw_item.token_data_id, + collection_id: raw_item.collection_id, + token_name: raw_item.token_name, + maximum: Some(raw_item.maximum.unwrap().to_string()), // todo: handle error + supply: Some(raw_item.supply.unwrap().to_string()), // todo: handle error + largest_property_version_v1: raw_item.largest_property_version_v1.unwrap().to_u64(), + token_uri: raw_item.token_uri, + token_properties: canonical_json::to_string(&raw_item.token_properties).unwrap(), // todo: handle error + description: raw_item.description, + token_standard: raw_item.token_standard, + is_fungible_v2: raw_item.is_fungible_v2, + last_transaction_version: raw_item.last_transaction_version, + last_transaction_timestamp: raw_item.last_transaction_timestamp, + decimals: raw_item.decimals, + is_deleted_v2: raw_item.is_deleted_v2, + } + } +} diff --git a/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs index 13896f33d..74516dd51 100644 --- a/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs +++ b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs @@ -8,13 +8,13 @@ use crate::{ bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, db::{ + common::models::token_v2_models::raw_v2_token_datas::RawTokenDataV2, parquet::models::fungible_asset_models::parquet_v2_fungible_asset_balances::DEFAULT_AMOUNT_VALUE, postgres::models::{ object_models::v2_object_utils::{ObjectAggregatedDataMapping, ObjectWithMetadata}, resources::FromWriteResource, token_models::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}, token_v2_models::{ - parquet_v2_token_datas::TokenDataV2, v2_token_ownerships::{CurrentTokenOwnershipV2, NFTOwnershipV2}, v2_token_utils::{TokenStandard, TokenV2Burned, DEFAULT_OWNER_ADDRESS}, }, @@ -75,7 +75,7 @@ impl TokenOwnershipV2 { /// For nfts it's the same resources that we parse tokendatas from so we leverage the work done in there to get ownership data /// Vecs are returned because there could be multiple transfers and we need to document each one here. pub fn get_nft_v2_from_token_data( - token_data: &TokenDataV2, + token_data: &RawTokenDataV2, object_metadatas: &ObjectAggregatedDataMapping, ) -> anyhow::Result> { let mut ownerships = vec![]; @@ -99,7 +99,7 @@ impl TokenOwnershipV2 { let non_transferrable_by_owner = !object_core.allow_ungated_transfer; ownerships.push(Self { - txn_version: token_data.txn_version, + txn_version: token_data.transaction_version, write_set_change_index: token_data.write_set_change_index, token_data_id: token_data_id.clone(), property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, @@ -110,7 +110,7 @@ impl TokenOwnershipV2 { token_properties_mutated_v1: None, is_soulbound_v2: Some(is_soulbound), token_standard: TokenStandard::V2.to_string(), - block_timestamp: token_data.block_timestamp, + block_timestamp: token_data.transaction_timestamp, non_transferrable_by_owner: Some(non_transferrable_by_owner), }); @@ -121,7 +121,7 @@ impl TokenOwnershipV2 { continue; } ownerships.push(Self { - txn_version: token_data.txn_version, + txn_version: token_data.transaction_version, // set to negative of event index to avoid collison with write set index write_set_change_index: -1 * event_index, token_data_id: token_data_id.clone(), @@ -135,7 +135,7 @@ impl TokenOwnershipV2 { token_properties_mutated_v1: None, is_soulbound_v2: Some(is_soulbound), token_standard: TokenStandard::V2.to_string(), - block_timestamp: token_data.block_timestamp, + block_timestamp: token_data.transaction_timestamp, non_transferrable_by_owner: Some(is_soulbound), }); } diff --git a/rust/processor/src/db/postgres/models/token_v2_models/mod.rs b/rust/processor/src/db/postgres/models/token_v2_models/mod.rs index 568250b2d..49bd71da5 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/mod.rs @@ -8,7 +8,3 @@ pub mod v2_token_datas; pub mod v2_token_metadata; pub mod v2_token_ownerships; pub mod v2_token_utils; - -// parquet models -// pub mod parquet_v2_collections; // revisit this -pub mod parquet_v2_token_datas; diff --git a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_datas.rs b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_datas.rs index 577d23caa..8490f4c45 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_datas.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_datas.rs @@ -5,16 +5,13 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned}; use crate::{ - db::postgres::models::{ - object_models::v2_object_utils::ObjectAggregatedDataMapping, resources::FromWriteResource, - token_models::token_utils::TokenWriteSet, + db::common::models::token_v2_models::raw_v2_token_datas::{ + CurrentTokenDataV2Convertible, RawCurrentTokenDataV2, RawTokenDataV2, + TokenDataV2Convertible, }, schema::{current_token_datas_v2, token_datas_v2}, - utils::util::standardize_address, }; -use aptos_protos::transaction::v1::{DeleteResource, WriteResource, WriteTableItem}; use bigdecimal::BigDecimal; use diesel::prelude::*; use field_count::FieldCount; @@ -47,6 +44,28 @@ pub struct TokenDataV2 { // pub is_deleted_v2: Option, } +impl TokenDataV2Convertible for TokenDataV2 { + fn from_raw(raw_item: RawTokenDataV2) -> Self { + Self { + transaction_version: raw_item.transaction_version, + write_set_change_index: raw_item.write_set_change_index, + token_data_id: raw_item.token_data_id, + collection_id: raw_item.collection_id, + token_name: raw_item.token_name, + maximum: raw_item.maximum, + supply: raw_item.supply, + largest_property_version_v1: raw_item.largest_property_version_v1, + token_uri: raw_item.token_uri, + token_properties: raw_item.token_properties, + description: raw_item.description, + token_standard: raw_item.token_standard, + is_fungible_v2: raw_item.is_fungible_v2, + transaction_timestamp: raw_item.transaction_timestamp, + decimals: raw_item.decimals, + } + } +} + #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(token_data_id))] #[diesel(table_name = current_token_datas_v2)] @@ -69,230 +88,24 @@ pub struct CurrentTokenDataV2 { pub is_deleted_v2: Option, } -impl TokenDataV2 { - // TODO: remove the useless_asref lint when new clippy nighly is released. - #[allow(clippy::useless_asref)] - pub fn get_v2_from_write_resource( - write_resource: &WriteResource, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - object_metadatas: &ObjectAggregatedDataMapping, - ) -> anyhow::Result> { - if let Some(inner) = &TokenV2::from_write_resource(write_resource)? { - let token_data_id = standardize_address(&write_resource.address.to_string()); - let mut token_name = inner.get_name_trunc(); - let is_fungible_v2; - // Get token properties from 0x4::property_map::PropertyMap - let mut token_properties = serde_json::Value::Null; - if let Some(object_metadata) = object_metadatas.get(&token_data_id) { - let fungible_asset_metadata = object_metadata.fungible_asset_metadata.as_ref(); - if fungible_asset_metadata.is_some() { - is_fungible_v2 = Some(true); - } else { - is_fungible_v2 = Some(false); - } - token_properties = object_metadata - .property_map - .as_ref() - .map(|m| m.inner.clone()) - .unwrap_or(token_properties); - // In aggregator V2 name is now derived from a separate struct - if let Some(token_identifier) = object_metadata.token_identifier.as_ref() { - token_name = token_identifier.get_name_trunc(); - } - } else { - // ObjectCore should not be missing, returning from entire function early - return Ok(None); - } - - let collection_id = inner.get_collection_address(); - let token_uri = inner.get_uri_trunc(); - - Ok(Some(( - Self { - transaction_version: txn_version, - write_set_change_index, - token_data_id: token_data_id.clone(), - collection_id: collection_id.clone(), - token_name: token_name.clone(), - maximum: None, - supply: None, - largest_property_version_v1: None, - token_uri: token_uri.clone(), - token_properties: token_properties.clone(), - description: inner.description.clone(), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2, - transaction_timestamp: txn_timestamp, - decimals: None, - }, - CurrentTokenDataV2 { - token_data_id, - collection_id, - token_name, - maximum: None, - supply: None, - largest_property_version_v1: None, - token_uri, - token_properties, - description: inner.description.clone(), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2, - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - decimals: None, - is_deleted_v2: Some(false), - }, - ))) - } else { - Ok(None) - } - } - - /// This handles the case where token is burned but objectCore is still there - pub async fn get_burned_nft_v2_from_write_resource( - write_resource: &WriteResource, - txn_version: i64, - txn_timestamp: chrono::NaiveDateTime, - tokens_burned: &TokenV2Burned, - ) -> anyhow::Result> { - let token_data_id = standardize_address(&write_resource.address.to_string()); - // reminder that v1 events won't get to this codepath - if let Some(burn_event_v2) = tokens_burned.get(&standardize_address(&token_data_id)) { - Ok(Some(CurrentTokenDataV2 { - token_data_id, - collection_id: burn_event_v2.get_collection_address(), - token_name: "".to_string(), - maximum: None, - supply: None, - largest_property_version_v1: None, - token_uri: "".to_string(), - token_properties: serde_json::Value::Null, - description: "".to_string(), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: Some(false), - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - decimals: None, - is_deleted_v2: Some(true), - })) - } else { - Ok(None) - } - } - - /// This handles the case where token is burned and objectCore is deleted - pub async fn get_burned_nft_v2_from_delete_resource( - delete_resource: &DeleteResource, - txn_version: i64, - txn_timestamp: chrono::NaiveDateTime, - tokens_burned: &TokenV2Burned, - ) -> anyhow::Result> { - let token_data_id = standardize_address(&delete_resource.address.to_string()); - // reminder that v1 events won't get to this codepath - if let Some(burn_event_v2) = tokens_burned.get(&standardize_address(&token_data_id)) { - Ok(Some(CurrentTokenDataV2 { - token_data_id, - collection_id: burn_event_v2.get_collection_address(), - token_name: "".to_string(), - maximum: None, - supply: None, - largest_property_version_v1: None, - token_uri: "".to_string(), - token_properties: serde_json::Value::Null, - description: "".to_string(), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: Some(false), - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - decimals: None, - is_deleted_v2: Some(true), - })) - } else { - Ok(None) - } - } - - pub fn get_v1_from_write_table_item( - table_item: &WriteTableItem, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - ) -> anyhow::Result> { - let table_item_data = table_item.data.as_ref().unwrap(); - - let maybe_token_data = match TokenWriteSet::from_table_item_type( - table_item_data.value_type.as_str(), - &table_item_data.value, - txn_version, - )? { - Some(TokenWriteSet::TokenData(inner)) => Some(inner), - _ => None, - }; - - if let Some(token_data) = maybe_token_data { - let maybe_token_data_id = match TokenWriteSet::from_table_item_type( - table_item_data.key_type.as_str(), - &table_item_data.key, - txn_version, - )? { - Some(TokenWriteSet::TokenDataId(inner)) => Some(inner), - _ => None, - }; - if let Some(token_data_id_struct) = maybe_token_data_id { - let collection_id = token_data_id_struct.get_collection_id(); - let token_data_id = token_data_id_struct.to_id(); - let token_name = token_data_id_struct.get_name_trunc(); - let token_uri = token_data.get_uri_trunc(); - - return Ok(Some(( - Self { - transaction_version: txn_version, - write_set_change_index, - token_data_id: token_data_id.clone(), - collection_id: collection_id.clone(), - token_name: token_name.clone(), - maximum: Some(token_data.maximum.clone()), - supply: Some(token_data.supply.clone()), - largest_property_version_v1: Some( - token_data.largest_property_version.clone(), - ), - token_uri: token_uri.clone(), - token_properties: token_data.default_properties.clone(), - description: token_data.description.clone(), - token_standard: TokenStandard::V1.to_string(), - is_fungible_v2: None, - transaction_timestamp: txn_timestamp, - decimals: None, - }, - CurrentTokenDataV2 { - token_data_id, - collection_id, - token_name, - maximum: Some(token_data.maximum), - supply: Some(token_data.supply), - largest_property_version_v1: Some(token_data.largest_property_version), - token_uri, - token_properties: token_data.default_properties, - description: token_data.description, - token_standard: TokenStandard::V1.to_string(), - is_fungible_v2: None, - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - decimals: None, - is_deleted_v2: None, - }, - ))); - } else { - tracing::warn!( - transaction_version = txn_version, - key_type = table_item_data.key_type, - key = table_item_data.key, - "Expecting token_data_id as key for value = token_data" - ); - } +impl CurrentTokenDataV2Convertible for CurrentTokenDataV2 { + fn from_raw(raw_item: RawCurrentTokenDataV2) -> Self { + Self { + token_data_id: raw_item.token_data_id, + collection_id: raw_item.collection_id, + token_name: raw_item.token_name, + maximum: raw_item.maximum, + supply: raw_item.supply, + largest_property_version_v1: raw_item.largest_property_version_v1, + token_uri: raw_item.token_uri, + token_properties: raw_item.token_properties, + description: raw_item.description, + token_standard: raw_item.token_standard, + is_fungible_v2: raw_item.is_fungible_v2, + last_transaction_version: raw_item.last_transaction_version, + last_transaction_timestamp: raw_item.last_transaction_timestamp, + decimals: raw_item.decimals, + is_deleted_v2: raw_item.is_deleted_v2, } - Ok(None) } } diff --git a/rust/processor/src/processors/nft_metadata_processor.rs b/rust/processor/src/processors/nft_metadata_processor.rs index 6a9503d5b..a5c6c610c 100644 --- a/rust/processor/src/processors/nft_metadata_processor.rs +++ b/rust/processor/src/processors/nft_metadata_processor.rs @@ -3,15 +3,20 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ - db::postgres::models::{ - object_models::v2_object_utils::{ - ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, + db::{ + common::models::token_v2_models::raw_v2_token_datas::{ + RawCurrentTokenDataV2, RawTokenDataV2, }, - resources::FromWriteResource, - token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, - token_v2_models::{ - v2_collections::{CollectionV2, CurrentCollectionV2, CurrentCollectionV2PK}, - v2_token_datas::{CurrentTokenDataV2, CurrentTokenDataV2PK, TokenDataV2}, + postgres::models::{ + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, + }, + resources::FromWriteResource, + token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, + token_v2_models::{ + v2_collections::{CollectionV2, CurrentCollectionV2, CurrentCollectionV2PK}, + v2_token_datas::CurrentTokenDataV2PK, + }, }, }, gap_detectors::ProcessingResult, @@ -195,7 +200,7 @@ impl ProcessorTrait for NftMetadataProcessor { } } -fn clean_token_pubsub_message(ctd: CurrentTokenDataV2, db_chain_id: u64) -> String { +fn clean_token_pubsub_message(ctd: RawCurrentTokenDataV2, db_chain_id: u64) -> String { remove_null_bytes(&format!( "{},{},{},{},{},false", ctd.token_data_id, @@ -224,8 +229,8 @@ async fn parse_v2_token( conn: &mut DbPoolConnection<'_>, query_retries: u32, query_retry_delay_ms: u64, -) -> (Vec, Vec) { - let mut current_token_datas_v2: AHashMap = +) -> (Vec, Vec) { + let mut current_token_datas_v2: AHashMap = AHashMap::new(); let mut current_collections_v2: AHashMap = AHashMap::new(); @@ -268,7 +273,7 @@ async fn parse_v2_token( match wsc.change.as_ref().unwrap() { Change::WriteTableItem(table_item) => { if let Some((_, current_token_data)) = - TokenDataV2::get_v1_from_write_table_item( + RawTokenDataV2::get_v1_from_write_table_item( table_item, txn_version, wsc_index, @@ -298,14 +303,15 @@ async fn parse_v2_token( } }, Change::WriteResource(resource) => { - if let Some((_, current_token_data)) = TokenDataV2::get_v2_from_write_resource( - resource, - txn_version, - wsc_index, - txn_timestamp, - &token_v2_metadata_helper, - ) - .unwrap() + if let Some((_, current_token_data)) = + RawTokenDataV2::get_v2_from_write_resource( + resource, + txn_version, + wsc_index, + txn_timestamp, + &token_v2_metadata_helper, + ) + .unwrap() { current_token_datas_v2 .insert(current_token_data.token_data_id.clone(), current_token_data); @@ -331,7 +337,7 @@ async fn parse_v2_token( let current_token_datas_v2 = current_token_datas_v2 .into_values() - .collect::>(); + .collect::>(); let current_collections_v2 = current_collections_v2 .into_values() .collect::>(); diff --git a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs index 0b16ecc51..a12d214f1 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs @@ -8,7 +8,12 @@ use crate::{ ParquetProcessingResult, }, db::{ - parquet::models::token_v2_models::v2_token_ownerships::TokenOwnershipV2, + common::models::token_v2_models::raw_v2_token_datas::{ + RawTokenDataV2, TokenDataV2Convertible, + }, + parquet::models::token_v2_models::{ + v2_token_datas::TokenDataV2, v2_token_ownerships::TokenOwnershipV2, + }, postgres::models::{ fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, object_models::v2_object_utils::{ @@ -17,7 +22,6 @@ use crate::{ resources::{FromWriteResource, V2TokenResource}, token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, token_v2_models::{ - parquet_v2_token_datas::TokenDataV2, v2_token_ownerships::NFTOwnershipV2, v2_token_utils::{ Burn, BurnEvent, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, @@ -129,15 +133,20 @@ impl ProcessorTrait for ParquetTokenV2Processor { let table_handle_to_owner = TableMetadataForToken::get_table_handle_to_owner_from_transactions(&transactions); - let (token_datas_v2, token_ownerships_v2) = parse_v2_token( + let (raw_token_datas_v2, token_ownerships_v2) = parse_v2_token( &transactions, &table_handle_to_owner, &mut transaction_version_to_struct_count, ) .await; + let parquet_token_datas_v2: Vec = raw_token_datas_v2 + .into_iter() + .map(TokenDataV2::from_raw) + .collect(); + let token_data_v2_parquet_data = ParquetDataGeneric { - data: token_datas_v2, + data: parquet_token_datas_v2, }; self.v2_token_datas_sender @@ -175,7 +184,7 @@ async fn parse_v2_token( transactions: &[Transaction], table_handle_to_owner: &TableHandleToOwner, transaction_version_to_struct_count: &mut AHashMap, -) -> (Vec, Vec) { +) -> (Vec, Vec) { // Token V2 and V1 combined let mut token_datas_v2 = vec![]; let mut token_ownerships_v2 = vec![]; @@ -315,15 +324,16 @@ async fn parse_v2_token( let wsc_index = index as i64; match wsc.change.as_ref().unwrap() { Change::WriteTableItem(table_item) => { - if let Some(token_data) = TokenDataV2::get_v1_from_write_table_item( - table_item, - txn_version, - wsc_index, - txn_timestamp, - ) - .unwrap() + if let Some((raw_token_data, _)) = + RawTokenDataV2::get_v1_from_write_table_item( + table_item, + txn_version, + wsc_index, + txn_timestamp, + ) + .unwrap() { - token_datas_v2.push(token_data); + token_datas_v2.push(raw_token_data); transaction_version_to_struct_count .entry(txn_version) .and_modify(|e| *e += 1) @@ -385,18 +395,19 @@ async fn parse_v2_token( } }, Change::WriteResource(resource) => { - if let Some(token_data) = TokenDataV2::get_v2_from_write_resource( - resource, - txn_version, - wsc_index, - txn_timestamp, - &token_v2_metadata_helper, - ) - .unwrap() + if let Some((raw_token_data, _current_token_data)) = + RawTokenDataV2::get_v2_from_write_resource( + resource, + txn_version, + wsc_index, + txn_timestamp, + &token_v2_metadata_helper, + ) + .unwrap() { // Add NFT ownership let mut ownerships = TokenOwnershipV2::get_nft_v2_from_token_data( - &token_data, + &raw_token_data, &token_v2_metadata_helper, ) .unwrap(); @@ -422,7 +433,7 @@ async fn parse_v2_token( .and_modify(|e| *e += ownerships.len() as i64 + 1) .or_insert(ownerships.len() as i64 + 1); token_ownerships_v2.append(&mut ownerships); - token_datas_v2.push(token_data); + token_datas_v2.push(raw_token_data); } // Add burned NFT handling if let Some((nft_ownership, current_nft_ownership)) = diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index dfe27f843..37e0c8b4b 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -10,6 +10,10 @@ use crate::{ }, raw_v1_token_royalty::{CurrentTokenRoyaltyV1Convertible, RawCurrentTokenRoyaltyV1}, raw_v2_token_activities::{RawTokenActivityV2, TokenActivityV2Convertible}, + raw_v2_token_datas::{ + CurrentTokenDataV2Convertible, RawCurrentTokenDataV2, RawTokenDataV2, + TokenDataV2Convertible, + }, raw_v2_token_metadata::{CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata}, }, postgres::models::{ @@ -613,11 +617,11 @@ impl ProcessorTrait for TokenV2Processor { // Token V2 processing which includes token v1 let ( mut collections_v2, - mut token_datas_v2, + raw_token_datas_v2, mut token_ownerships_v2, current_collections_v2, - current_token_datas_v2, - current_deleted_token_datas_v2, + raw_current_token_datas_v2, + raw_current_deleted_token_datas_v2, current_token_ownerships_v2, current_deleted_token_ownerships_v2, raw_token_activities_v2, @@ -655,6 +659,22 @@ impl ProcessorTrait for TokenV2Processor { .map(TokenActivityV2::from_raw) .collect(); + let mut postgres_token_datas_v2: Vec = raw_token_datas_v2 + .into_iter() + .map(TokenDataV2::from_raw) + .collect(); + + let postgres_current_token_datas_v2: Vec = raw_current_token_datas_v2 + .into_iter() + .map(CurrentTokenDataV2::from_raw) + .collect(); + + let postgres_current_deleted_token_datas_v2: Vec = + raw_current_deleted_token_datas_v2 + .into_iter() + .map(CurrentTokenDataV2::from_raw) + .collect(); + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -665,7 +685,7 @@ impl ProcessorTrait for TokenV2Processor { token_ownerships_v2.clear(); } if self.deprecated_tables.contains(TableFlags::TOKEN_DATAS_V2) { - token_datas_v2.clear(); + postgres_token_datas_v2.clear(); } if self.deprecated_tables.contains(TableFlags::COLLECTIONS_V2) { collections_v2.clear(); @@ -683,10 +703,13 @@ impl ProcessorTrait for TokenV2Processor { start_version, end_version, &collections_v2, - &token_datas_v2, + &postgres_token_datas_v2, &token_ownerships_v2, ¤t_collections_v2, - (¤t_token_datas_v2, ¤t_deleted_token_datas_v2), + ( + &postgres_current_token_datas_v2, + &postgres_current_deleted_token_datas_v2, + ), ( ¤t_token_ownerships_v2, ¤t_deleted_token_ownerships_v2, @@ -736,11 +759,11 @@ pub async fn parse_v2_token( query_retry_delay_ms: u64, ) -> ( Vec, - Vec, + Vec, Vec, Vec, - Vec, - Vec, + Vec, + Vec, Vec, Vec, // deleted token ownerships Vec, @@ -756,9 +779,9 @@ pub async fn parse_v2_token( let mut current_collections_v2: AHashMap = AHashMap::new(); - let mut current_token_datas_v2: AHashMap = + let mut current_token_datas_v2: AHashMap = AHashMap::new(); - let mut current_deleted_token_datas_v2: AHashMap = + let mut current_deleted_token_datas_v2: AHashMap = AHashMap::new(); let mut current_token_ownerships_v2: AHashMap< CurrentTokenOwnershipV2PK, @@ -976,7 +999,7 @@ pub async fn parse_v2_token( ); } if let Some((token_data, current_token_data)) = - TokenDataV2::get_v1_from_write_table_item( + RawTokenDataV2::get_v1_from_write_table_item( table_item, txn_version, wsc_index, @@ -1124,8 +1147,8 @@ pub async fn parse_v2_token( current_collection, ); } - if let Some((token_data, current_token_data)) = - TokenDataV2::get_v2_from_write_resource( + if let Some((raw_token_data, current_token_data)) = + RawTokenDataV2::get_v2_from_write_resource( resource, txn_version, wsc_index, @@ -1137,7 +1160,7 @@ pub async fn parse_v2_token( // Add NFT ownership let (mut ownerships, current_ownerships) = TokenOwnershipV2::get_nft_v2_from_token_data( - &token_data, + &TokenDataV2::from_raw(raw_token_data.clone()), &token_v2_metadata_helper, ) .unwrap(); @@ -1160,7 +1183,7 @@ pub async fn parse_v2_token( } token_ownerships_v2.append(&mut ownerships); current_token_ownerships_v2.extend(current_ownerships); - token_datas_v2.push(token_data); + token_datas_v2.push(raw_token_data); current_token_datas_v2.insert( current_token_data.token_data_id.clone(), current_token_data, @@ -1170,7 +1193,7 @@ pub async fn parse_v2_token( // Add burned NFT handling for token datas (can probably be merged with below) // This handles the case where token is burned but objectCore is still there if let Some(deleted_token_data) = - TokenDataV2::get_burned_nft_v2_from_write_resource( + RawTokenDataV2::get_burned_nft_v2_from_write_resource( resource, txn_version, txn_timestamp, @@ -1244,7 +1267,7 @@ pub async fn parse_v2_token( Change::DeleteResource(resource) => { // Add burned NFT handling for token datas (can probably be merged with below) if let Some(deleted_token_data) = - TokenDataV2::get_burned_nft_v2_from_delete_resource( + RawTokenDataV2::get_burned_nft_v2_from_delete_resource( resource, txn_version, txn_timestamp, @@ -1305,10 +1328,10 @@ pub async fn parse_v2_token( .collect::>(); let mut current_token_datas_v2 = current_token_datas_v2 .into_values() - .collect::>(); + .collect::>(); let mut current_deleted_token_datas_v2 = current_deleted_token_datas_v2 .into_values() - .collect::>(); + .collect::>(); let mut current_token_ownerships_v2 = current_token_ownerships_v2 .into_values() .collect::>(); diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 80b4da957..c81e2a8d7 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -28,8 +28,11 @@ use processor::{ parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, token_v2_models::{ - token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, - v2_token_activities::TokenActivityV2, v2_token_metadata::CurrentTokenV2Metadata, + token_claims::CurrentTokenPendingClaim, + v1_token_royalty::CurrentTokenRoyaltyV1, + v2_token_activities::TokenActivityV2, + v2_token_datas::{CurrentTokenDataV2, TokenDataV2}, + v2_token_metadata::CurrentTokenV2Metadata, }, transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, }, @@ -168,6 +171,8 @@ impl ProcessorConfig { CurrentTokenRoyaltyV1::TABLE_NAME.to_string(), CurrentTokenV2Metadata::TABLE_NAME.to_string(), TokenActivityV2::TABLE_NAME.to_string(), + TokenDataV2::TABLE_NAME.to_string(), + CurrentTokenDataV2::TABLE_NAME.to_string(), ]), _ => HashSet::new(), // Default case for unsupported processors } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 7bce0e52b..f58b0359a 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -34,8 +34,11 @@ use processor::{ parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, token_v2_models::{ - token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, - v2_token_activities::TokenActivityV2, v2_token_metadata::CurrentTokenV2Metadata, + token_claims::CurrentTokenPendingClaim, + v1_token_royalty::CurrentTokenRoyaltyV1, + v2_token_activities::TokenActivityV2, + v2_token_datas::{CurrentTokenDataV2, TokenDataV2}, + v2_token_metadata::CurrentTokenV2Metadata, }, transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, }, @@ -108,6 +111,8 @@ pub enum ParquetTypeEnum { CurrentTokenRoyaltyV1, CurrentTokenV2Metadata, TokenActivitiesV2, + TokenDatasV2, + CurrentTokenDatasV2, } /// Trait for handling various Parquet types. @@ -198,6 +203,8 @@ impl_parquet_trait!( ParquetTypeEnum::CurrentTokenV2Metadata ); impl_parquet_trait!(TokenActivityV2, ParquetTypeEnum::TokenActivitiesV2); +impl_parquet_trait!(TokenDataV2, ParquetTypeEnum::TokenDatasV2); +impl_parquet_trait!(CurrentTokenDataV2, ParquetTypeEnum::CurrentTokenDatasV2); #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] @@ -223,6 +230,8 @@ pub enum ParquetTypeStructs { CurrentTokenRoyaltyV1(Vec), CurrentTokenV2Metadata(Vec), TokenActivityV2(Vec), + TokenDataV2(Vec), + CurrentTokenDataV2(Vec), } impl ParquetTypeStructs { @@ -269,6 +278,10 @@ impl ParquetTypeStructs { ParquetTypeStructs::CurrentTokenV2Metadata(Vec::new()) }, ParquetTypeEnum::TokenActivitiesV2 => ParquetTypeStructs::TokenActivityV2(Vec::new()), + ParquetTypeEnum::TokenDatasV2 => ParquetTypeStructs::TokenDataV2(Vec::new()), + ParquetTypeEnum::CurrentTokenDatasV2 => { + ParquetTypeStructs::CurrentTokenDataV2(Vec::new()) + }, } } @@ -405,6 +418,18 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + ( + ParquetTypeStructs::TokenDataV2(self_data), + ParquetTypeStructs::TokenDataV2(other_data), + ) => { + handle_append!(self_data, other_data) + }, + ( + ParquetTypeStructs::CurrentTokenDataV2(self_data), + ParquetTypeStructs::CurrentTokenDataV2(other_data), + ) => { + handle_append!(self_data, other_data) + }, _ => Err(ProcessorError::ProcessError { message: "Mismatched buffer types in append operation".to_string(), }), diff --git a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs index e21e522f6..d876f1cdd 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs @@ -31,8 +31,11 @@ use parquet::schema::types::Type; use processor::{ bq_analytics::generic_parquet_processor::HasParquetSchema, db::parquet::models::token_v2_models::{ - token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, - v2_token_activities::TokenActivityV2, v2_token_metadata::CurrentTokenV2Metadata, + token_claims::CurrentTokenPendingClaim, + v1_token_royalty::CurrentTokenRoyaltyV1, + v2_token_activities::TokenActivityV2, + v2_token_datas::{CurrentTokenDataV2, TokenDataV2}, + v2_token_metadata::CurrentTokenV2Metadata, }, }; use std::{collections::HashMap, sync::Arc}; @@ -141,6 +144,15 @@ impl ProcessorTrait for ParquetTokenV2Processor { ParquetTypeEnum::TokenActivitiesV2, TokenActivityV2::schema(), ), + (ParquetTypeEnum::TokenDatasV2, TokenDataV2::schema()), + ( + ParquetTypeEnum::CurrentTokenDatasV2, + CurrentTokenDataV2::schema(), + ), + ( + ParquetTypeEnum::CurrentTokenDatasV2, + CurrentTokenDataV2::schema(), + ), ] .into_iter() .collect(); diff --git a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs index 3e990252c..9aef4fbc7 100644 --- a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs @@ -15,11 +15,15 @@ use processor::{ raw_token_claims::CurrentTokenPendingClaimConvertible, raw_v1_token_royalty::CurrentTokenRoyaltyV1Convertible, raw_v2_token_activities::TokenActivityV2Convertible, + raw_v2_token_datas::{CurrentTokenDataV2Convertible, TokenDataV2Convertible}, raw_v2_token_metadata::CurrentTokenV2MetadataConvertible, }, parquet::models::token_v2_models::{ - token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, - v2_token_activities::TokenActivityV2, v2_token_metadata::CurrentTokenV2Metadata, + token_claims::CurrentTokenPendingClaim, + v1_token_royalty::CurrentTokenRoyaltyV1, + v2_token_activities::TokenActivityV2, + v2_token_datas::{CurrentTokenDataV2, TokenDataV2}, + v2_token_metadata::CurrentTokenV2Metadata, }, postgres::models::token_models::tokens::TableMetadataForToken, }, @@ -84,11 +88,11 @@ impl Processable for ParquetTokenV2Extractor { let ( _collections_v2, - _token_datas_v2, + raw_token_datas_v2, _token_ownerships_v2, _current_collections_v2, - _current_token_datas_v2, - _current_deleted_token_datas_v2, + raw_current_token_datas_v2, + raw_current_deleted_token_datas_v2, _current_token_ownerships_v2, _current_deleted_token_ownerships_v2, raw_token_activities_v2, @@ -126,6 +130,22 @@ impl Processable for ParquetTokenV2Extractor { .map(TokenActivityV2::from_raw) .collect(); + let parquet_token_datas_v2: Vec = raw_token_datas_v2 + .into_iter() + .map(TokenDataV2::from_raw) + .collect(); + + let parquet_current_token_datas_v2: Vec = raw_current_token_datas_v2 + .into_iter() + .map(CurrentTokenDataV2::from_raw) + .collect(); + + let parquet_deleted_current_token_datss_v2: Vec = + raw_current_deleted_token_datas_v2 + .into_iter() + .map(CurrentTokenDataV2::from_raw) + .collect(); + // Print the size of each extracted data type debug!("Processed data sizes:"); debug!( @@ -141,6 +161,24 @@ impl Processable for ParquetTokenV2Extractor { parquet_current_token_v2_metadata.len() ); debug!(" - TokenActivityV2: {}", parquet_token_activities_v2.len()); + debug!(" - TokenDataV2: {}", parquet_token_datas_v2.len()); + debug!( + " - CurrentTokenDataV2: {}", + parquet_current_token_datas_v2.len() + ); + debug!( + " - CurrentDeletedTokenDataV2: {}", + parquet_deleted_current_token_datss_v2.len() + ); + + // We are merging these two tables, b/c they are essentially the same table + let mut combined_current_token_datas_v2: Vec = Vec::new(); + parquet_current_token_datas_v2 + .iter() + .for_each(|x| combined_current_token_datas_v2.push(x.clone())); + parquet_deleted_current_token_datss_v2 + .iter() + .for_each(|x| combined_current_token_datas_v2.push(x.clone())); let mut map: HashMap = HashMap::new(); @@ -166,6 +204,16 @@ impl Processable for ParquetTokenV2Extractor { ParquetTypeEnum::TokenActivitiesV2, ParquetTypeStructs::TokenActivityV2(parquet_token_activities_v2), ), + ( + TableFlags::TOKEN_DATAS_V2, + ParquetTypeEnum::TokenDatasV2, + ParquetTypeStructs::TokenDataV2(parquet_token_datas_v2), + ), + ( + TableFlags::CURRENT_TOKEN_DATAS_V2, + ParquetTypeEnum::CurrentTokenDatasV2, + ParquetTypeStructs::CurrentTokenDataV2(combined_current_token_datas_v2), + ), ]; // Populate the map based on opt-in tables diff --git a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs index abbe1fe78..c34d6ba9e 100644 --- a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs @@ -12,6 +12,7 @@ use processor::{ raw_token_claims::CurrentTokenPendingClaimConvertible, raw_v1_token_royalty::CurrentTokenRoyaltyV1Convertible, raw_v2_token_activities::TokenActivityV2Convertible, + raw_v2_token_datas::{CurrentTokenDataV2Convertible, TokenDataV2Convertible}, raw_v2_token_metadata::CurrentTokenV2MetadataConvertible, }, postgres::models::{ @@ -106,11 +107,11 @@ impl Processable for TokenV2Extractor { let ( collections_v2, - token_datas_v2, + raw_token_datas_v2, token_ownerships_v2, current_collections_v2, - current_token_datas_v2, - current_deleted_token_datas_v2, + raw_current_token_datas_v2, + raw_current_deleted_token_datas_v2, current_token_ownerships_v2, current_deleted_token_ownerships_v2, raw_token_activities_v2, @@ -148,14 +149,30 @@ impl Processable for TokenV2Extractor { .map(TokenActivityV2::from_raw) .collect(); + let postgres_token_datas_v2: Vec = raw_token_datas_v2 + .into_iter() + .map(TokenDataV2::from_raw) + .collect(); + + let postgres_current_token_datas_v2: Vec = raw_current_token_datas_v2 + .into_iter() + .map(CurrentTokenDataV2::from_raw) + .collect(); + + let postgress_current_deleted_token_datas_v2: Vec = + raw_current_deleted_token_datas_v2 + .into_iter() + .map(CurrentTokenDataV2::from_raw) + .collect(); + Ok(Some(TransactionContext { data: ( collections_v2, - token_datas_v2, + postgres_token_datas_v2, token_ownerships_v2, current_collections_v2, - current_token_datas_v2, - current_deleted_token_datas_v2, + postgres_current_token_datas_v2, + postgress_current_deleted_token_datas_v2, current_token_ownerships_v2, current_deleted_token_ownerships_v2, postgres_token_activities_v2,