Skip to content

Commit

Permalink
[parquet-sdk][token_v2] migrate CurrentTokenV2Metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Dec 10, 2024
1 parent 460a0a5 commit f8acbf3
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 80 deletions.
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/token_v2_models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod raw_token_claims;
pub mod raw_v1_token_royalty;
pub mod raw_v2_token_metadata;
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
db::postgres::models::{
default_models::move_resources::MoveResource,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
resources::{COIN_ADDR, TOKEN_ADDR, TOKEN_V2_ADDR},
token_models::token_utils::NAME_LENGTH,
},
utils::util::{standardize_address, truncate_str},
};
use anyhow::Context;
use aptos_protos::transaction::v1::WriteResource;
use serde::{Deserialize, Serialize};
use serde_json::Value;

// PK of current_objects, i.e. object_address, resource_type
pub type CurrentTokenV2MetadataPK = (String, String);

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct RawCurrentTokenV2Metadata {
pub object_address: String,
pub resource_type: String,
pub data: Value,
pub state_key_hash: String,
pub last_transaction_version: i64,
pub last_transaction_timestamp: chrono::NaiveDateTime,
}

impl Ord for RawCurrentTokenV2Metadata {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.object_address
.cmp(&other.object_address)
.then(self.resource_type.cmp(&other.resource_type))
}
}
impl PartialOrd for RawCurrentTokenV2Metadata {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl RawCurrentTokenV2Metadata {
/// Parsing unknown resources with 0x4::token::Token
pub fn from_write_resource(
write_resource: &WriteResource,
txn_version: i64,
object_metadatas: &ObjectAggregatedDataMapping,
txn_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<Self>> {
let object_address = standardize_address(&write_resource.address.to_string());
if let Some(object_data) = object_metadatas.get(&object_address) {
// checking if token_v2
if object_data.token.is_some() {
let move_tag =
MoveResource::convert_move_struct_tag(write_resource.r#type.as_ref().unwrap());
let resource_type_addr = move_tag.get_address();
if matches!(
resource_type_addr.as_str(),
COIN_ADDR | TOKEN_ADDR | TOKEN_V2_ADDR
) {
return Ok(None);
}

let resource = MoveResource::from_write_resource(write_resource, 0, txn_version, 0);

let state_key_hash = object_data.object.get_state_key_hash();
if state_key_hash != resource.state_key_hash {
return Ok(None);
}

let resource_type = truncate_str(&resource.type_, NAME_LENGTH);
return Ok(Some(RawCurrentTokenV2Metadata {
object_address,
resource_type,
data: resource
.data
.context("data must be present in write resource")?,
state_key_hash: resource.state_key_hash,
last_transaction_version: txn_version,
last_transaction_timestamp: txn_timestamp,
}));
}
}
Ok(None)
}
}

pub trait CurrentTokenV2MetadataConvertible {
fn from_raw(raw_item: RawCurrentTokenV2Metadata) -> Self;
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod token_claims;
pub mod v1_token_royalty;
pub mod v2_token_metadata;
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::token_v2_models::raw_v2_token_metadata::{
CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata,
},
};
use allocative_derive::Allocative;
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct CurrentTokenV2Metadata {
pub object_address: String,
pub resource_type: String,
pub data: String,
pub state_key_hash: String,
pub last_transaction_version: i64,
#[allocative(skip)]
pub last_transaction_timestamp: chrono::NaiveDateTime,
}
impl NamedTable for CurrentTokenV2Metadata {
const TABLE_NAME: &'static str = "current_token_v2_metadata";
}

impl HasVersion for CurrentTokenV2Metadata {
fn version(&self) -> i64 {
self.last_transaction_version
}
}

impl GetTimeStamp for CurrentTokenV2Metadata {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.last_transaction_timestamp
}
}

impl CurrentTokenV2MetadataConvertible for CurrentTokenV2Metadata {
// TODO: consider returning a Result
fn from_raw(raw_item: RawCurrentTokenV2Metadata) -> Self {
Self {
object_address: raw_item.object_address,
resource_type: raw_item.resource_type,
data: canonical_json::to_string(&raw_item.data).unwrap(), // TODO: handle better
state_key_hash: raw_item.state_key_hash,
last_transaction_version: raw_item.last_transaction_version,
last_transaction_timestamp: raw_item.last_transaction_timestamp,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,19 @@
#![allow(clippy::unused_unit)]

use crate::{
db::postgres::models::{
default_models::move_resources::MoveResource,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
resources::{COIN_ADDR, TOKEN_ADDR, TOKEN_V2_ADDR},
token_models::token_utils::NAME_LENGTH,
db::common::models::token_v2_models::raw_v2_token_metadata::{
CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata,
},
schema::current_token_v2_metadata,
utils::util::{standardize_address, truncate_str},
};
use anyhow::Context;
use aptos_protos::transaction::v1::WriteResource;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};
use serde_json::Value;

// PK of current_objects, i.e. object_address, resource_type
pub type CurrentTokenV2MetadataPK = (String, String);

#[derive(
Clone, Debug, Deserialize, Eq, FieldCount, Identifiable, Insertable, PartialEq, Serialize,
)]
#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(object_address, resource_type))]
#[diesel(table_name = current_token_v2_metadata)]
pub struct CurrentTokenV2Metadata {
Expand All @@ -37,59 +29,14 @@ pub struct CurrentTokenV2Metadata {
pub last_transaction_version: i64,
}

impl Ord for CurrentTokenV2Metadata {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.object_address
.cmp(&other.object_address)
.then(self.resource_type.cmp(&other.resource_type))
}
}
impl PartialOrd for CurrentTokenV2Metadata {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl CurrentTokenV2Metadata {
/// Parsing unknown resources with 0x4::token::Token
pub fn from_write_resource(
write_resource: &WriteResource,
txn_version: i64,
object_metadatas: &ObjectAggregatedDataMapping,
) -> anyhow::Result<Option<Self>> {
let object_address = standardize_address(&write_resource.address.to_string());
if let Some(object_data) = object_metadatas.get(&object_address) {
// checking if token_v2
if object_data.token.is_some() {
let move_tag =
MoveResource::convert_move_struct_tag(write_resource.r#type.as_ref().unwrap());
let resource_type_addr = move_tag.get_address();
if matches!(
resource_type_addr.as_str(),
COIN_ADDR | TOKEN_ADDR | TOKEN_V2_ADDR
) {
return Ok(None);
}

let resource = MoveResource::from_write_resource(write_resource, 0, txn_version, 0);

let state_key_hash = object_data.object.get_state_key_hash();
if state_key_hash != resource.state_key_hash {
return Ok(None);
}

let resource_type = truncate_str(&resource.type_, NAME_LENGTH);
return Ok(Some(CurrentTokenV2Metadata {
object_address,
resource_type,
data: resource
.data
.context("data must be present in write resource")?,
state_key_hash: resource.state_key_hash,
last_transaction_version: txn_version,
}));
}
impl CurrentTokenV2MetadataConvertible for CurrentTokenV2Metadata {
fn from_raw(raw_item: RawCurrentTokenV2Metadata) -> Self {
Self {
object_address: raw_item.object_address,
resource_type: raw_item.resource_type,
data: raw_item.data,
state_key_hash: raw_item.state_key_hash,
last_transaction_version: raw_item.last_transaction_version,
}
Ok(None)
}
}
37 changes: 24 additions & 13 deletions rust/processor/src/processors/token_v2_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
common::models::token_v2_models::{
raw_token_claims::{CurrentTokenPendingClaimConvertible, RawCurrentTokenPendingClaim},
raw_v1_token_royalty::{CurrentTokenRoyaltyV1Convertible, RawCurrentTokenRoyaltyV1},
raw_v2_token_metadata::{CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata},
},
postgres::models::{
fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata,
Expand Down Expand Up @@ -618,7 +619,7 @@ impl ProcessorTrait for TokenV2Processor {
current_token_ownerships_v2,
current_deleted_token_ownerships_v2,
token_activities_v2,
mut current_token_v2_metadata,
raw_current_token_v2_metadata,
raw_current_token_royalties_v1,
raw_current_token_claims,
) = parse_v2_token(
Expand All @@ -641,6 +642,12 @@ impl ProcessorTrait for TokenV2Processor {
.map(CurrentTokenRoyaltyV1::from_raw)
.collect();

let mut postgres_current_token_v2_metadata: Vec<CurrentTokenV2Metadata> =
raw_current_token_v2_metadata
.into_iter()
.map(CurrentTokenV2Metadata::from_raw)
.collect();

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

Expand All @@ -660,7 +667,7 @@ impl ProcessorTrait for TokenV2Processor {
.deprecated_tables
.contains(TableFlags::CURRENT_TOKEN_V2_METADATA)
{
current_token_v2_metadata.clear();
postgres_current_token_v2_metadata.clear();
}

let tx_result = insert_to_db(
Expand All @@ -678,7 +685,7 @@ impl ProcessorTrait for TokenV2Processor {
&current_deleted_token_ownerships_v2,
),
&token_activities_v2,
&current_token_v2_metadata,
&postgres_current_token_v2_metadata,
&postgres_current_token_royalties_v1,
&postgres_current_token_claims,
&self.per_table_chunk_sizes,
Expand Down Expand Up @@ -730,7 +737,7 @@ pub async fn parse_v2_token(
Vec<CurrentTokenOwnershipV2>,
Vec<CurrentTokenOwnershipV2>, // deleted token ownerships
Vec<TokenActivityV2>,
Vec<CurrentTokenV2Metadata>,
Vec<RawCurrentTokenV2Metadata>,
Vec<RawCurrentTokenRoyaltyV1>,
Vec<RawCurrentTokenPendingClaim>,
) {
Expand Down Expand Up @@ -758,8 +765,10 @@ pub async fn parse_v2_token(
// we can still get the object core metadata for it
let mut token_v2_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new();
// Basically token properties
let mut current_token_v2_metadata: AHashMap<CurrentTokenV2MetadataPK, CurrentTokenV2Metadata> =
AHashMap::new();
let mut current_token_v2_metadata: AHashMap<
CurrentTokenV2MetadataPK,
RawCurrentTokenV2Metadata,
> = AHashMap::new();
let mut current_token_royalties_v1: AHashMap<CurrentTokenDataV2PK, RawCurrentTokenRoyaltyV1> =
AHashMap::new();
// migrating this from v1 token model as we don't have any replacement table for this
Expand Down Expand Up @@ -1207,12 +1216,14 @@ pub async fn parse_v2_token(
}

// Track token properties
if let Some(token_metadata) = CurrentTokenV2Metadata::from_write_resource(
resource,
txn_version,
&token_v2_metadata_helper,
)
.unwrap()
if let Some(token_metadata) =
RawCurrentTokenV2Metadata::from_write_resource(
resource,
txn_version,
&token_v2_metadata_helper,
txn_timestamp,
)
.unwrap()
{
current_token_v2_metadata.insert(
(
Expand Down Expand Up @@ -1296,7 +1307,7 @@ pub async fn parse_v2_token(
.collect::<Vec<CurrentTokenOwnershipV2>>();
let mut current_token_v2_metadata = current_token_v2_metadata
.into_values()
.collect::<Vec<CurrentTokenV2Metadata>>();
.collect::<Vec<RawCurrentTokenV2Metadata>>();
let mut current_deleted_token_ownerships_v2 = current_deleted_token_ownerships_v2
.into_values()
.collect::<Vec<CurrentTokenOwnershipV2>>();
Expand Down
2 changes: 2 additions & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use processor::{
},
token_v2_models::{
token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1,
v2_token_metadata::CurrentTokenV2Metadata,
},
transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
},
Expand Down Expand Up @@ -165,6 +166,7 @@ impl ProcessorConfig {
ProcessorName::ParquetTokenV2Processor => HashSet::from([
CurrentTokenPendingClaim::TABLE_NAME.to_string(),
CurrentTokenRoyaltyV1::TABLE_NAME.to_string(),
CurrentTokenV2Metadata::TABLE_NAME.to_string(),
]),
_ => HashSet::new(), // Default case for unsupported processors
}
Expand Down
Loading

0 comments on commit f8acbf3

Please sign in to comment.