Skip to content

Commit

Permalink
fix parquet token datas none data handling (#649)
Browse files Browse the repository at this point in the history
### Description
The None data wasn't properly handled for TokenDataV2 model.
- updated parquet models to handle None values to have None, not the default String. 


<img width="2022" alt="Screenshot 2024-12-16 at 4 42 19 PM" src="https://github.com/user-attachments/assets/7b3a3866-6a51-45b7-a071-2df198881255" />


### Test Plan
ran a processor for 10 mins with no issues. 
![Screenshot 2024-12-16 at 4 43 12 PM](https://github.com/user-attachments/assets/14681237-c586-4e2a-a998-51fe8486b922)
  • Loading branch information
yuunlimm authored Dec 17, 2024
1 parent c094090 commit 1973e8b
Show file tree
Hide file tree
Showing 19 changed files with 113 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct RawAnsLookupV2 {
}

pub trait AnsLookupV2Convertible {
fn from_raw(raw_item: &RawAnsLookupV2) -> Self;
fn from_raw(raw_item: RawAnsLookupV2) -> Self;
}

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
Expand Down Expand Up @@ -165,5 +165,5 @@ impl RawCurrentAnsLookupV2 {
}

pub trait CurrentAnsLookupV2Convertible {
fn from_raw(raw_item: &RawCurrentAnsLookupV2) -> Self;
fn from_raw(raw_item: RawCurrentAnsLookupV2) -> Self;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct RawAnsPrimaryNameV2 {
}

pub trait AnsPrimaryNameV2Convertible {
fn from_raw(raw_item: &RawAnsPrimaryNameV2) -> Self;
fn from_raw(raw_item: RawAnsPrimaryNameV2) -> Self;
}

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
Expand Down Expand Up @@ -158,5 +158,5 @@ impl RawCurrentAnsPrimaryNameV2 {
}

pub trait CurrentAnsPrimaryNameV2Convertible {
fn from_raw(raw_item: &RawCurrentAnsPrimaryNameV2) -> Self;
fn from_raw(raw_item: RawCurrentAnsPrimaryNameV2) -> Self;
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ impl RawBlockMetadataTransaction {
pub type RawBlockMetadataTransactionModel = RawBlockMetadataTransaction;

pub trait BlockMetadataTransactionConvertible {
fn from_raw(raw_item: &RawBlockMetadataTransaction) -> Self;
fn from_raw(raw_item: RawBlockMetadataTransaction) -> Self;
}
24 changes: 12 additions & 12 deletions rust/processor/src/db/parquet/models/ans_models/ans_lookup_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ impl GetTimeStamp for AnsLookupV2 {
}

impl AnsLookupV2Convertible for AnsLookupV2 {
fn from_raw(raw_item: &RawAnsLookupV2) -> Self {
fn from_raw(raw_item: RawAnsLookupV2) -> Self {
AnsLookupV2 {
transaction_version: raw_item.transaction_version,
write_set_change_index: raw_item.write_set_change_index,
domain: raw_item.domain.clone(),
subdomain: raw_item.subdomain.clone(),
token_standard: raw_item.token_standard.clone(),
registered_address: raw_item.registered_address.clone(),
domain: raw_item.domain,
subdomain: raw_item.subdomain,
token_standard: raw_item.token_standard,
registered_address: raw_item.registered_address,
expiration_timestamp: raw_item.expiration_timestamp,
token_name: raw_item.token_name.clone(),
token_name: raw_item.token_name,
is_deleted: raw_item.is_deleted,
subdomain_expiration_policy: raw_item.subdomain_expiration_policy,
}
Expand Down Expand Up @@ -97,15 +97,15 @@ impl GetTimeStamp for CurrentAnsLookupV2 {
}

impl CurrentAnsLookupV2Convertible for CurrentAnsLookupV2 {
fn from_raw(raw_item: &RawCurrentAnsLookupV2) -> Self {
fn from_raw(raw_item: RawCurrentAnsLookupV2) -> Self {
CurrentAnsLookupV2 {
domain: raw_item.domain.clone(),
subdomain: raw_item.subdomain.clone(),
token_standard: raw_item.token_standard.clone(),
registered_address: raw_item.registered_address.clone(),
domain: raw_item.domain,
subdomain: raw_item.subdomain,
token_standard: raw_item.token_standard,
registered_address: raw_item.registered_address,
last_transaction_version: raw_item.last_transaction_version,
expiration_timestamp: raw_item.expiration_timestamp,
token_name: raw_item.token_name.clone(),
token_name: raw_item.token_name,
is_deleted: raw_item.is_deleted,
subdomain_expiration_policy: raw_item.subdomain_expiration_policy,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ impl GetTimeStamp for AnsPrimaryNameV2 {
}

impl AnsPrimaryNameV2Convertible for AnsPrimaryNameV2 {
fn from_raw(raw_item: &RawAnsPrimaryNameV2) -> Self {
fn from_raw(raw_item: RawAnsPrimaryNameV2) -> Self {
AnsPrimaryNameV2 {
txn_version: raw_item.transaction_version,
write_set_change_index: raw_item.write_set_change_index,
registered_address: raw_item.registered_address.clone(),
token_standard: raw_item.token_standard.clone(),
domain: raw_item.domain.clone(),
subdomain: raw_item.subdomain.clone(),
token_name: raw_item.token_name.clone(),
registered_address: raw_item.registered_address,
token_standard: raw_item.token_standard,
domain: raw_item.domain,
subdomain: raw_item.subdomain,
token_name: raw_item.token_name,
is_deleted: raw_item.is_deleted,
block_timestamp: raw_item.transaction_timestamp,
}
Expand Down Expand Up @@ -90,13 +90,13 @@ impl GetTimeStamp for CurrentAnsPrimaryNameV2 {
}

impl CurrentAnsPrimaryNameV2Convertible for CurrentAnsPrimaryNameV2 {
fn from_raw(raw_item: &RawCurrentAnsPrimaryNameV2) -> Self {
fn from_raw(raw_item: RawCurrentAnsPrimaryNameV2) -> Self {
CurrentAnsPrimaryNameV2 {
registered_address: raw_item.registered_address.clone(),
token_standard: raw_item.token_standard.clone(),
domain: raw_item.domain.clone(),
subdomain: raw_item.subdomain.clone(),
token_name: raw_item.token_name.clone(),
registered_address: raw_item.registered_address,
token_standard: raw_item.token_standard,
domain: raw_item.domain,
subdomain: raw_item.subdomain,
token_name: raw_item.token_name,
is_deleted: raw_item.is_deleted,
last_transaction_version: raw_item.last_transaction_version,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ impl GetTimeStamp for BlockMetadataTransaction {
}

impl BlockMetadataTransactionConvertible for BlockMetadataTransaction {
fn from_raw(raw_item: &RawBlockMetadataTransaction) -> Self {
fn from_raw(raw_item: RawBlockMetadataTransaction) -> Self {
BlockMetadataTransaction {
txn_version: raw_item.version,
block_height: raw_item.block_height,
block_id: raw_item.id.clone(),
block_id: raw_item.id,
round: raw_item.round,
epoch: raw_item.epoch,
previous_block_votes_bitvec: raw_item.previous_block_votes_bitvec.clone(),
proposer: raw_item.proposer.clone(),
failed_proposer_indices: raw_item.failed_proposer_indices.clone(),
previous_block_votes_bitvec: raw_item.previous_block_votes_bitvec,
proposer: raw_item.proposer,
failed_proposer_indices: raw_item.failed_proposer_indices,
block_timestamp: raw_item.timestamp,
since_unix_epoch: raw_item.ns_since_unix_epoch,
}
Expand Down Expand Up @@ -120,7 +120,7 @@ mod tests {
ns_since_unix_epoch: compute_nanos_since_epoch(time_stamp),
};

let block_metadata_transaction = BlockMetadataTransaction::from_raw(&raw);
let block_metadata_transaction = BlockMetadataTransaction::from_raw(raw);

assert_eq!(block_metadata_transaction.txn_version, 1);
assert_eq!(block_metadata_transaction.block_height, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ impl FungibleAssetActivityConvertible for FungibleAssetActivity {
storage_id: raw_item.storage_id,
asset_type: raw_item.asset_type,
is_frozen: raw_item.is_frozen,
amount: Some(raw_item.amount.clone().unwrap().to_string()),
event_type: raw_item.event_type.clone(),
amount: raw_item.amount.map(|v| v.to_string()),
event_type: raw_item.event_type,
is_gas_fee: raw_item.is_gas_fee,
gas_fee_payer_address: raw_item.gas_fee_payer_address,
is_transaction_success: raw_item.is_transaction_success,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl GetTimeStamp for CurrentTokenPendingClaim {
}

impl CurrentTokenPendingClaimConvertible for CurrentTokenPendingClaim {
// TODO: consider returning a Result
fn from_raw(raw_item: RawCurrentTokenPendingClaim) -> Self {
Self {
token_data_id_hash: raw_item.token_data_id_hash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use crate::{
};
use allocative_derive::Allocative;
use anyhow::Context;
use bigdecimal::ToPrimitive;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};
use tracing::error;

#[derive(Allocative, Clone, Debug, Default, Deserialize, ParquetRecordWriter, Serialize)]
pub struct TokenDataV2 {
Expand All @@ -28,7 +28,7 @@ pub struct TokenDataV2 {
pub token_data_id: String,
pub collection_id: String,
pub token_name: String,
pub largest_property_version_v1: Option<u64>,
pub largest_property_version_v1: Option<String>, // String format of BigDecimal
pub token_uri: String,
pub token_properties: String,
pub description: String,
Expand Down Expand Up @@ -63,13 +63,9 @@ impl TokenDataV2Convertible for TokenDataV2 {
token_data_id: raw_item.token_data_id,
collection_id: raw_item.collection_id,
token_name: raw_item.token_name,
largest_property_version_v1: Some(
raw_item
.largest_property_version_v1
.unwrap()
.to_u64()
.unwrap(),
),
largest_property_version_v1: raw_item
.largest_property_version_v1
.map(|v| v.to_string()),
token_uri: raw_item.token_uri,
token_properties: canonical_json::to_string(&raw_item.token_properties.clone())
.context("Failed to serialize token properties")
Expand All @@ -88,9 +84,9 @@ pub struct CurrentTokenDataV2 {
pub token_data_id: String,
pub collection_id: String,
pub token_name: String,
pub maximum: Option<String>, // BigDecimal
pub supply: Option<String>,
pub largest_property_version_v1: Option<u64>,
pub maximum: Option<String>, // BigDecimal
pub supply: Option<String>, // BigDecimal
pub largest_property_version_v1: Option<String>, // String format of BigDecimal
pub token_uri: String,
pub token_properties: String, // serde_json::Value,
pub description: String,
Expand Down Expand Up @@ -126,25 +122,28 @@ impl CurrentTokenDataV2Convertible for CurrentTokenDataV2 {
token_data_id: raw_item.token_data_id,
collection_id: raw_item.collection_id,
token_name: raw_item.token_name,
maximum: raw_item
.maximum
.map_or(Some(DEFAULT_NONE.to_string()), |v| Some(v.to_string())),
supply: raw_item
.supply
.map_or(Some(DEFAULT_NONE.to_string()), |v| Some(v.to_string())),
maximum: raw_item.maximum.map(|v| v.to_string()),
supply: raw_item.supply.map(|v| v.to_string()),
largest_property_version_v1: raw_item
.largest_property_version_v1
.map(|v| v.to_u64().unwrap_or(0)),
.map(|v| v.to_string()),
token_uri: raw_item.token_uri,
token_properties: canonical_json::to_string(&raw_item.token_properties)
.unwrap_or_else(|_| DEFAULT_NONE.to_string()),
token_properties: canonical_json::to_string(&raw_item.token_properties).unwrap_or_else(
|_| {
error!(
"Failed to serialize token_properties to JSON: {:?}",
raw_item.token_properties
);
DEFAULT_NONE.to_string()
},
),
description: raw_item.description,
token_standard: raw_item.token_standard,
is_fungible_v2: raw_item.is_fungible_v2.or(Some(false)), // Defaulting to false if None
is_fungible_v2: raw_item.is_fungible_v2,
last_transaction_version: raw_item.last_transaction_version,
last_transaction_timestamp: raw_item.last_transaction_timestamp,
decimals: raw_item.decimals,
is_deleted_v2: raw_item.is_deleted_v2.or(Some(false)), // Defaulting to false if None
is_deleted_v2: raw_item.is_deleted_v2,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::token_v2_models::raw_v2_token_metadata::{
CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata,
db::{
common::models::token_v2_models::raw_v2_token_metadata::{
CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata,
},
parquet::models::DEFAULT_NONE,
},
};
use allocative_derive::Allocative;
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};
use tracing::error;

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
Expand Down Expand Up @@ -50,7 +54,10 @@ impl CurrentTokenV2MetadataConvertible for CurrentTokenV2Metadata {
Self {
object_address: raw_item.object_address,
resource_type: raw_item.resource_type,
data: canonical_json::to_string(&raw_item.data).unwrap(), // TODO: handle better
data: canonical_json::to_string(&raw_item.data).unwrap_or_else(|_| {
error!("Failed to serialize data to JSON: {:?}", raw_item.data);
DEFAULT_NONE.to_string()
}),
state_key_hash: raw_item.state_key_hash,
last_transaction_version: raw_item.last_transaction_version,
last_transaction_timestamp: raw_item.last_transaction_timestamp,
Expand Down
24 changes: 12 additions & 12 deletions rust/processor/src/db/postgres/models/ans_models/ans_lookup_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ pub struct AnsLookupV2 {
}

impl AnsLookupV2Convertible for AnsLookupV2 {
fn from_raw(raw_item: &RawAnsLookupV2) -> Self {
fn from_raw(raw_item: RawAnsLookupV2) -> Self {
AnsLookupV2 {
transaction_version: raw_item.transaction_version,
write_set_change_index: raw_item.write_set_change_index,
domain: raw_item.domain.clone(),
subdomain: raw_item.subdomain.clone(),
token_standard: raw_item.token_standard.clone(),
registered_address: raw_item.registered_address.clone(),
domain: raw_item.domain,
subdomain: raw_item.subdomain,
token_standard: raw_item.token_standard,
registered_address: raw_item.registered_address,
expiration_timestamp: raw_item.expiration_timestamp,
token_name: raw_item.token_name.clone(),
token_name: raw_item.token_name,
is_deleted: raw_item.is_deleted,
subdomain_expiration_policy: raw_item.subdomain_expiration_policy,
}
Expand Down Expand Up @@ -78,15 +78,15 @@ pub struct CurrentAnsLookupV2 {
}

impl CurrentAnsLookupV2Convertible for CurrentAnsLookupV2 {
fn from_raw(raw_item: &RawCurrentAnsLookupV2) -> Self {
fn from_raw(raw_item: RawCurrentAnsLookupV2) -> Self {
CurrentAnsLookupV2 {
domain: raw_item.domain.clone(),
subdomain: raw_item.subdomain.clone(),
token_standard: raw_item.token_standard.clone(),
registered_address: raw_item.registered_address.clone(),
domain: raw_item.domain,
subdomain: raw_item.subdomain,
token_standard: raw_item.token_standard,
registered_address: raw_item.registered_address,
last_transaction_version: raw_item.last_transaction_version,
expiration_timestamp: raw_item.expiration_timestamp,
token_name: raw_item.token_name.clone(),
token_name: raw_item.token_name,
is_deleted: raw_item.is_deleted,
subdomain_expiration_policy: raw_item.subdomain_expiration_policy,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ pub struct AnsPrimaryNameV2 {
}

impl AnsPrimaryNameV2Convertible for AnsPrimaryNameV2 {
fn from_raw(raw_item: &RawAnsPrimaryNameV2) -> Self {
fn from_raw(raw_item: RawAnsPrimaryNameV2) -> Self {
AnsPrimaryNameV2 {
transaction_version: raw_item.transaction_version,
write_set_change_index: raw_item.write_set_change_index,
registered_address: raw_item.registered_address.clone(),
token_standard: raw_item.token_standard.clone(),
domain: raw_item.domain.clone(),
subdomain: raw_item.subdomain.clone(),
token_name: raw_item.token_name.clone(),
registered_address: raw_item.registered_address,
token_standard: raw_item.token_standard,
domain: raw_item.domain,
subdomain: raw_item.subdomain,
token_name: raw_item.token_name,
is_deleted: raw_item.is_deleted,
}
}
Expand Down Expand Up @@ -71,13 +71,13 @@ pub struct CurrentAnsPrimaryNameV2 {
}

impl CurrentAnsPrimaryNameV2Convertible for CurrentAnsPrimaryNameV2 {
fn from_raw(raw_item: &RawCurrentAnsPrimaryNameV2) -> Self {
fn from_raw(raw_item: RawCurrentAnsPrimaryNameV2) -> Self {
CurrentAnsPrimaryNameV2 {
registered_address: raw_item.registered_address.clone(),
token_standard: raw_item.token_standard.clone(),
domain: raw_item.domain.clone(),
subdomain: raw_item.subdomain.clone(),
token_name: raw_item.token_name.clone(),
registered_address: raw_item.registered_address,
token_standard: raw_item.token_standard,
domain: raw_item.domain,
subdomain: raw_item.subdomain,
token_name: raw_item.token_name,
is_deleted: raw_item.is_deleted,
last_transaction_version: raw_item.last_transaction_version,
}
Expand Down
Loading

0 comments on commit 1973e8b

Please sign in to comment.