diff --git a/rust/processor/migrations/2024-03-07-224504_fungible_asset_metadata_is_token_v2/down.sql b/rust/processor/migrations/2024-03-07-224504_fungible_asset_metadata_is_token_v2/down.sql new file mode 100644 index 000000000..c92803930 --- /dev/null +++ b/rust/processor/migrations/2024-03-07-224504_fungible_asset_metadata_is_token_v2/down.sql @@ -0,0 +1,9 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE fungible_asset_metadata +DROP COLUMN IF EXISTS is_token_v2; +ALTER TABLE objects +ADD COLUMN IF NOT EXISTS is_token BOOLEAN, +ADD COLUMN IF NOT EXISTS is_fungible_asset BOOLEAN; +ALTER TABLE current_objects +ADD COLUMN IF NOT EXISTS is_token BOOLEAN, +ADD COLUMN IF NOT EXISTS is_fungible_asset BOOLEAN; \ No newline at end of file diff --git a/rust/processor/migrations/2024-03-07-224504_fungible_asset_metadata_is_token_v2/up.sql b/rust/processor/migrations/2024-03-07-224504_fungible_asset_metadata_is_token_v2/up.sql new file mode 100644 index 000000000..9899f4f3d --- /dev/null +++ b/rust/processor/migrations/2024-03-07-224504_fungible_asset_metadata_is_token_v2/up.sql @@ -0,0 +1,9 @@ +-- Your SQL goes here +ALTER TABLE fungible_asset_metadata +ADD COLUMN IF NOT EXISTS is_token_v2 BOOLEAN; +ALTER TABLE objects +DROP COLUMN IF EXISTS is_fungible_asset, +DROP COLUMN IF EXISTS is_token; +ALTER TABLE current_objects +DROP COLUMN IF EXISTS is_fungible_asset, +DROP COLUMN IF EXISTS is_token; \ No newline at end of file diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs index 166827d62..0ee419f27 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs @@ -9,7 +9,7 @@ use super::v2_fungible_asset_utils::FungibleAssetMetadata; use crate::{ models::{ coin_models::coin_utils::{CoinInfoType, CoinResource}, - object_models::{v2_object_utils::ObjectAggregatedDataMapping, v2_objects::Object}, + object_models::v2_object_utils::ObjectAggregatedDataMapping, token_v2_models::v2_token_utils::TokenStandard, }, schema::fungible_asset_metadata, @@ -17,7 +17,8 @@ use crate::{ }; use ahash::AHashMap; use aptos_protos::transaction::v1::WriteResource; -use diesel::sql_types::Text; +use diesel::prelude::*; +use diesel_async::RunQueryDsl; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -42,12 +43,27 @@ pub struct FungibleAssetMetadataModel { pub supply_aggregator_table_handle_v1: Option, pub supply_aggregator_table_key_v1: Option, pub token_standard: String, + pub is_token_v2: Option, } -#[derive(Debug, QueryableByName)] -pub struct AssetTypeFromTable { - #[diesel(sql_type = Text)] +#[derive(Debug, Deserialize, Identifiable, Queryable, Serialize)] +#[diesel(primary_key(asset_type))] +#[diesel(table_name = fungible_asset_metadata)] +pub struct FungibleAssetMetadataQuery { pub asset_type: String, + pub creator_address: String, + pub name: String, + pub symbol: String, + pub decimals: i32, + pub icon_uri: Option, + pub project_uri: Option, + pub last_transaction_version: i64, + pub last_transaction_timestamp: chrono::NaiveDateTime, + pub supply_aggregator_table_handle_v1: Option, + pub supply_aggregator_table_key_v1: Option, + pub token_standard: String, + pub inserted_at: chrono::NaiveDateTime, + pub is_token_v2: Option, } impl FungibleAssetMetadataModel { @@ -65,10 +81,7 @@ impl FungibleAssetMetadataModel { let asset_type = standardize_address(&write_resource.address.to_string()); if let Some(object_metadata) = object_metadatas.get(&asset_type) { let object = &object_metadata.object.object_core; - // Do not write here if asset is fungible token - if object_metadata.token.is_some() { - return Ok(None); - } + let is_token_v2 = object_metadata.token.is_some(); return Ok(Some(Self { asset_type: asset_type.clone(), @@ -83,6 +96,7 @@ impl FungibleAssetMetadataModel { supply_aggregator_table_handle_v1: None, supply_aggregator_table_key_v1: None, token_standard: TokenStandard::V2.to_string(), + is_token_v2: Some(is_token_v2), })); } } @@ -121,6 +135,7 @@ impl FungibleAssetMetadataModel { supply_aggregator_table_handle_v1: supply_aggregator_table_handle, supply_aggregator_table_key_v1: supply_aggregator_table_key, token_standard: TokenStandard::V1.to_string(), + is_token_v2: Some(false), })) } else { Ok(None) @@ -135,39 +150,48 @@ impl FungibleAssetMetadataModel { /// 2. If metadata is not present, we will do a lookup in the db. pub async fn is_address_fungible_asset( conn: &mut PgPoolConnection<'_>, - address: &str, + asset_type: &str, object_aggregated_data_mapping: &ObjectAggregatedDataMapping, txn_version: i64, ) -> bool { // 1. If metadata is present without token object, then it's not a token - if let Some(object_data) = object_aggregated_data_mapping.get(address) { + if let Some(object_data) = object_aggregated_data_mapping.get(asset_type) { if object_data.fungible_asset_metadata.is_some() { return object_data.token.is_none(); } } // 2. If metadata is not present, we will do a lookup in the db. - // The object must exist in current_objects table for this processor to proceed - // If it doesn't exist or is null, then you probably need to backfill objects processor - match Object::get_current_object(conn, address, txn_version).await { - Ok(object) => { - if let (Some(is_fa), Some(is_token)) = (object.is_fungible_asset, object.is_token) { - return is_fa && !is_token; + match FungibleAssetMetadataQuery::get_by_asset_type(conn, asset_type).await { + Ok(metadata) => { + if let Some(is_token_v2) = metadata.is_token_v2 { + return !is_token_v2; } - tracing::error!("is_fungible_asset and/or is_token is null for object_address: {}. You should probably backfill db.", address); - // By default, assume it's not a fungible token and index it as a fungible asset - true + // If is_token_v2 is null, then the metadata is a v1 coin info, and it's not a token + false }, Err(_) => { tracing::error!( transaction_version = txn_version, - lookup_key = address, - "Missing current_object for object_address: {}. You probably should backfill db.", - address, + lookup_key = asset_type, + "Missing fungible_asset_metadata for asset_type: {}. You probably should backfill db.", + asset_type, ); - // By default, assume it's not a fungible token and index it as a fungible asset + // Default true }, } } } + +impl FungibleAssetMetadataQuery { + pub async fn get_by_asset_type( + conn: &mut PgPoolConnection<'_>, + asset_type: &str, + ) -> diesel::QueryResult { + fungible_asset_metadata::table + .filter(fungible_asset_metadata::asset_type.eq(asset_type)) + .first::(conn) + .await + } +} diff --git a/rust/processor/src/models/object_models/v2_objects.rs b/rust/processor/src/models/object_models/v2_objects.rs index 85e2d6728..ad1fd605d 100644 --- a/rust/processor/src/models/object_models/v2_objects.rs +++ b/rust/processor/src/models/object_models/v2_objects.rs @@ -34,8 +34,6 @@ pub struct Object { pub state_key_hash: String, pub guid_creation_num: BigDecimal, pub allow_ungated_transfer: bool, - pub is_token: Option, - pub is_fungible_asset: Option, pub is_deleted: bool, } @@ -49,8 +47,6 @@ pub struct CurrentObject { pub allow_ungated_transfer: bool, pub last_guid_creation_num: BigDecimal, pub last_transaction_version: i64, - pub is_token: Option, - pub is_fungible_asset: Option, pub is_deleted: bool, } @@ -66,8 +62,6 @@ pub struct CurrentObjectQuery { pub last_transaction_version: i64, pub is_deleted: bool, pub inserted_at: chrono::NaiveDateTime, - pub is_token: Option, - pub is_fungible_asset: Option, } impl Object { @@ -91,10 +85,6 @@ impl Object { state_key_hash: object_with_metadata.state_key_hash.clone(), guid_creation_num: object_core.guid_creation_num.clone(), allow_ungated_transfer: object_core.allow_ungated_transfer, - is_token: Some(object_aggregated_metadata.token.is_some()), - is_fungible_asset: Some( - object_aggregated_metadata.fungible_asset_metadata.is_some(), - ), is_deleted: false, }, CurrentObject { @@ -104,10 +94,6 @@ impl Object { allow_ungated_transfer: object_core.allow_ungated_transfer, last_guid_creation_num: object_core.guid_creation_num.clone(), last_transaction_version: txn_version, - is_token: Some(object_aggregated_metadata.token.is_some()), - is_fungible_asset: Some( - object_aggregated_metadata.fungible_asset_metadata.is_some(), - ), is_deleted: false, }, ))) @@ -158,8 +144,6 @@ impl Object { state_key_hash: resource.state_key_hash.clone(), guid_creation_num: previous_object.last_guid_creation_num.clone(), allow_ungated_transfer: previous_object.allow_ungated_transfer, - is_token: previous_object.is_token, - is_fungible_asset: previous_object.is_fungible_asset, is_deleted: true, }, CurrentObject { @@ -169,8 +153,6 @@ impl Object { last_guid_creation_num: previous_object.last_guid_creation_num.clone(), allow_ungated_transfer: previous_object.allow_ungated_transfer, last_transaction_version: txn_version, - is_token: previous_object.is_token, - is_fungible_asset: previous_object.is_fungible_asset, is_deleted: true, }, ))) @@ -198,8 +180,6 @@ impl Object { allow_ungated_transfer: res.allow_ungated_transfer, last_guid_creation_num: res.last_guid_creation_num, last_transaction_version: res.last_transaction_version, - is_token: res.is_token, - is_fungible_asset: res.is_fungible_asset, is_deleted: res.is_deleted, }); }, diff --git a/rust/processor/src/models/token_v2_models/v2_token_datas.rs b/rust/processor/src/models/token_v2_models/v2_token_datas.rs index b82293eab..b1c497ddc 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_datas.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_datas.rs @@ -8,14 +8,19 @@ use super::v2_token_utils::{TokenStandard, TokenV2}; use crate::{ models::{ - object_models::{v2_object_utils::ObjectAggregatedDataMapping, v2_objects::Object}, - token_models::token_utils::TokenWriteSet, + object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_models::{ + collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, + token_utils::TokenWriteSet, + }, }, schema::{current_token_datas_v2, token_datas_v2}, utils::{database::PgPoolConnection, util::standardize_address}, }; use aptos_protos::transaction::v1::{WriteResource, WriteTableItem}; use bigdecimal::{BigDecimal, Zero}; +use diesel::prelude::*; +use diesel_async::RunQueryDsl; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -63,6 +68,27 @@ pub struct CurrentTokenDataV2 { pub decimals: i64, } +#[derive(Debug, Deserialize, Identifiable, Queryable, Serialize)] +#[diesel(primary_key(token_data_id))] +#[diesel(table_name = current_token_datas_v2)] +pub struct CurrentTokenDataV2Query { + pub token_data_id: String, + pub collection_id: String, + pub token_name: String, + pub maximum: Option, + pub supply: BigDecimal, + pub largest_property_version_v1: Option, + pub token_uri: String, + pub description: String, + pub token_properties: serde_json::Value, + pub token_standard: String, + pub is_fungible_v2: Option, + pub last_transaction_version: i64, + pub last_transaction_timestamp: chrono::NaiveDateTime, + pub inserted_at: chrono::NaiveDateTime, + pub decimals: i64, +} + impl TokenDataV2 { // TODO: remove the useless_asref lint when new clippy nighly is released. #[allow(clippy::useless_asref)] @@ -235,40 +261,92 @@ impl TokenDataV2 { /// 2. If metadata is not present, we will do a lookup in the db. pub async fn is_address_fungible_token( conn: &mut PgPoolConnection<'_>, - address: &str, + token_data_id: &str, object_aggregated_data_mapping: &ObjectAggregatedDataMapping, txn_version: i64, ) -> bool { // 1. If metadata is present, the object is a token iff token struct is also present in the object - if let Some(object_data) = object_aggregated_data_mapping.get(address) { + if let Some(object_data) = object_aggregated_data_mapping.get(token_data_id) { if object_data.fungible_asset_metadata.is_some() { return object_data.token.is_some(); } } // 2. If metadata is not present, we will do a lookup in the db. - // The object must exist in current_objects table for this processor to proceed - match Object::get_current_object(conn, address, txn_version).await { - Ok(object) => { - if let Some(is_token) = object.is_token { - return is_token; + match CurrentTokenDataV2::get_current_token_data_v2(conn, txn_version, token_data_id).await + { + Ok(token_data) => { + if let Some(is_fungible_v2) = token_data.is_fungible_v2 { + return is_fungible_v2; } - tracing::error!( - "is_token is null for object_address: {}. You should probably backfill db.", - address - ); - // By default, don't index + // If is_fungible_v2 is null, that's likely because it's a v1 token, which are not fungible false }, Err(_) => { tracing::error!( transaction_version = txn_version, - lookup_key = address, - "Missing current_object for object_address: {}. You probably should backfill db.", - address, + lookup_key = token_data_id, + "Missing current_token_data_v2 for token_data_id: {}. You probably should backfill db.", + token_data_id, ); - // By default, don't index + // Default false }, } } } + +impl CurrentTokenDataV2 { + pub async fn get_current_token_data_v2( + conn: &mut PgPoolConnection<'_>, + txn_version: i64, + token_data_id: &str, + ) -> anyhow::Result { + let mut retries = 0; + while retries < QUERY_RETRIES { + retries += 1; + match CurrentTokenDataV2Query::get_by_token_data_id(conn, token_data_id).await { + Ok(res) => { + return Ok(CurrentTokenDataV2 { + token_data_id: res.token_data_id, + collection_id: res.collection_id, + token_name: res.token_name, + maximum: res.maximum, + supply: res.supply, + largest_property_version_v1: res.largest_property_version_v1, + token_uri: res.token_uri, + token_properties: res.token_properties, + description: res.description, + token_standard: res.token_standard, + is_fungible_v2: res.is_fungible_v2, + last_transaction_version: res.last_transaction_version, + last_transaction_timestamp: res.last_transaction_timestamp, + decimals: res.decimals, + }); + }, + Err(_) => { + tracing::error!( + transaction_version = txn_version, + lookup_key = token_data_id, + "Missing current_token_data_v2 for token_data_id: {}. You probably should backfill db.", + token_data_id, + ); + tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)) + .await; + }, + } + } + Err(anyhow::anyhow!("Failed to get token data")) + } +} + +impl CurrentTokenDataV2Query { + pub async fn get_by_token_data_id( + conn: &mut PgPoolConnection<'_>, + token_data_id: &str, + ) -> diesel::QueryResult { + current_token_datas_v2::table + .filter(current_token_datas_v2::token_data_id.eq(token_data_id)) + .first::(conn) + .await + } +} diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs index 946c85763..269fd9bdd 100644 --- a/rust/processor/src/processors/objects_processor.rs +++ b/rust/processor/src/processors/objects_processor.rs @@ -3,15 +3,9 @@ use super::{ProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ - models::{ - fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, - object_models::{ - v2_object_utils::{ - ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, - }, - v2_objects::{CurrentObject, Object}, - }, - token_v2_models::v2_token_utils::TokenV2, + models::object_models::{ + v2_object_utils::{ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata}, + v2_objects::{CurrentObject, Object}, }, schema, utils::{ @@ -103,11 +97,7 @@ fn insert_objects_query( .values(items_to_insert) .on_conflict((transaction_version, write_set_change_index)) .do_update() - .set(( - inserted_at.eq(excluded(inserted_at)), - is_token.eq(excluded(is_token)), - is_fungible_asset.eq(excluded(is_fungible_asset)), - )), + .set((inserted_at.eq(excluded(inserted_at)),)), None, ) } @@ -132,8 +122,6 @@ fn insert_current_objects_query( last_transaction_version.eq(excluded(last_transaction_version)), is_deleted.eq(excluded(is_deleted)), inserted_at.eq(excluded(inserted_at)), - is_token.eq(excluded(is_token)), - is_fungible_asset.eq(excluded(is_fungible_asset)), )), Some( " WHERE current_objects.last_transaction_version <= excluded.last_transaction_version ", @@ -205,29 +193,7 @@ impl ProcessorTrait for ObjectsProcessor { } } - // Second pass to get all other structs related to the object - for wsc in changes.iter() { - if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { - let address = standardize_address(&wr.address.to_string()); - - // Find structs related to object - if let Some(aggregated_data) = object_metadata_helper.get_mut(&address) { - if let Some(token) = TokenV2::from_write_resource(wr, txn_version).unwrap() - { - // Object is a token if it has 0x4::token::Token struct - aggregated_data.token = Some(token); - } - if let Some(fungible_asset_metadata) = - FungibleAssetMetadata::from_write_resource(wr, txn_version).unwrap() - { - // Object is a fungible asset if it has a 0x1::fungible_asset::FungibleAssetMetadata - aggregated_data.fungible_asset_metadata = Some(fungible_asset_metadata); - } - } - } - } - - // Third pass to construct the object data + // Second pass to construct the object data for (index, wsc) in changes.iter().enumerate() { let index: i64 = index as i64; match wsc.change.as_ref().unwrap() { diff --git a/rust/processor/src/schema.rs b/rust/processor/src/schema.rs index d8f911b0a..5fb567a42 100644 --- a/rust/processor/src/schema.rs +++ b/rust/processor/src/schema.rs @@ -449,8 +449,6 @@ diesel::table! { last_transaction_version -> Int8, is_deleted -> Bool, inserted_at -> Timestamp, - is_token -> Nullable, - is_fungible_asset -> Nullable, } } @@ -791,6 +789,7 @@ diesel::table! { #[max_length = 10] token_standard -> Varchar, inserted_at -> Timestamp, + is_token_v2 -> Nullable, } } @@ -873,8 +872,6 @@ diesel::table! { allow_ungated_transfer -> Bool, is_deleted -> Bool, inserted_at -> Timestamp, - is_token -> Nullable, - is_fungible_asset -> Nullable, } }