Skip to content

Commit

Permalink
Remove dependency on objects processor
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso committed Mar 8, 2024
1 parent 93ea78f commit b47c7fe
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file should undo anything in `up.sql`
ALTER TABLE fungible_asset_metadata
DROP COLUMN IF EXISTS is_token_v2;
ALTER TABLE objects
ADD COLUMN IF NOT EXISTS is_token BOOLEAN,
ADD COLUMN IF NOT EXISTS is_fungible_asset BOOLEAN;
ALTER TABLE current_objects
ADD COLUMN IF NOT EXISTS is_token BOOLEAN,
ADD COLUMN IF NOT EXISTS is_fungible_asset BOOLEAN;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Your SQL goes here
ALTER TABLE fungible_asset_metadata
ADD COLUMN IF NOT EXISTS is_token_v2 BOOLEAN;
ALTER TABLE objects
DROP COLUMN IF EXISTS is_fungible_asset,
DROP COLUMN IF EXISTS is_token;
ALTER TABLE current_objects
DROP COLUMN IF EXISTS is_fungible_asset,
DROP COLUMN IF EXISTS is_token;
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ use super::v2_fungible_asset_utils::FungibleAssetMetadata;
use crate::{
models::{
coin_models::coin_utils::{CoinInfoType, CoinResource},
object_models::{v2_object_utils::ObjectAggregatedDataMapping, v2_objects::Object},
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::TokenStandard,
},
schema::fungible_asset_metadata,
utils::{database::PgPoolConnection, util::standardize_address},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::WriteResource;
use diesel::sql_types::Text;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

Expand All @@ -42,12 +43,27 @@ pub struct FungibleAssetMetadataModel {
pub supply_aggregator_table_handle_v1: Option<String>,
pub supply_aggregator_table_key_v1: Option<String>,
pub token_standard: String,
pub is_token_v2: Option<bool>,
}

#[derive(Debug, QueryableByName)]
pub struct AssetTypeFromTable {
#[diesel(sql_type = Text)]
#[derive(Debug, Deserialize, Identifiable, Queryable, Serialize)]
#[diesel(primary_key(asset_type))]
#[diesel(table_name = fungible_asset_metadata)]
pub struct FungibleAssetMetadataQuery {
pub asset_type: String,
pub creator_address: String,
pub name: String,
pub symbol: String,
pub decimals: i32,
pub icon_uri: Option<String>,
pub project_uri: Option<String>,
pub last_transaction_version: i64,
pub last_transaction_timestamp: chrono::NaiveDateTime,
pub supply_aggregator_table_handle_v1: Option<String>,
pub supply_aggregator_table_key_v1: Option<String>,
pub token_standard: String,
pub inserted_at: chrono::NaiveDateTime,
pub is_token_v2: Option<bool>,
}

impl FungibleAssetMetadataModel {
Expand All @@ -65,10 +81,7 @@ impl FungibleAssetMetadataModel {
let asset_type = standardize_address(&write_resource.address.to_string());
if let Some(object_metadata) = object_metadatas.get(&asset_type) {
let object = &object_metadata.object.object_core;
// Do not write here if asset is fungible token
if object_metadata.token.is_some() {
return Ok(None);
}
let is_token_v2 = object_metadata.token.is_some();

return Ok(Some(Self {
asset_type: asset_type.clone(),
Expand All @@ -83,6 +96,7 @@ impl FungibleAssetMetadataModel {
supply_aggregator_table_handle_v1: None,
supply_aggregator_table_key_v1: None,
token_standard: TokenStandard::V2.to_string(),
is_token_v2: Some(is_token_v2),
}));
}
}
Expand Down Expand Up @@ -121,6 +135,7 @@ impl FungibleAssetMetadataModel {
supply_aggregator_table_handle_v1: supply_aggregator_table_handle,
supply_aggregator_table_key_v1: supply_aggregator_table_key,
token_standard: TokenStandard::V1.to_string(),
is_token_v2: Some(false),
}))
} else {
Ok(None)
Expand All @@ -135,39 +150,48 @@ impl FungibleAssetMetadataModel {
/// 2. If metadata is not present, we will do a lookup in the db.
pub async fn is_address_fungible_asset(
conn: &mut PgPoolConnection<'_>,
address: &str,
asset_type: &str,
object_aggregated_data_mapping: &ObjectAggregatedDataMapping,
txn_version: i64,
) -> bool {
// 1. If metadata is present without token object, then it's not a token
if let Some(object_data) = object_aggregated_data_mapping.get(address) {
if let Some(object_data) = object_aggregated_data_mapping.get(asset_type) {
if object_data.fungible_asset_metadata.is_some() {
return object_data.token.is_none();
}
}
// 2. If metadata is not present, we will do a lookup in the db.
// The object must exist in current_objects table for this processor to proceed
// If it doesn't exist or is null, then you probably need to backfill objects processor
match Object::get_current_object(conn, address, txn_version).await {
Ok(object) => {
if let (Some(is_fa), Some(is_token)) = (object.is_fungible_asset, object.is_token) {
return is_fa && !is_token;
match FungibleAssetMetadataQuery::get_by_asset_type(conn, asset_type).await {
Ok(metadata) => {
if let Some(is_token_v2) = metadata.is_token_v2 {
return !is_token_v2;
}

tracing::error!("is_fungible_asset and/or is_token is null for object_address: {}. You should probably backfill db.", address);
// By default, assume it's not a fungible token and index it as a fungible asset
true
// If is_token_v2 is null, then the metadata is a v1 coin info, and it's not a token
false
},
Err(_) => {
tracing::error!(
transaction_version = txn_version,
lookup_key = address,
"Missing current_object for object_address: {}. You probably should backfill db.",
address,
lookup_key = asset_type,
"Missing fungible_asset_metadata for asset_type: {}. You probably should backfill db.",
asset_type,
);
// By default, assume it's not a fungible token and index it as a fungible asset
// Default
true
},
}
}
}

impl FungibleAssetMetadataQuery {
pub async fn get_by_asset_type(
conn: &mut PgPoolConnection<'_>,
asset_type: &str,
) -> diesel::QueryResult<Self> {
fungible_asset_metadata::table
.filter(fungible_asset_metadata::asset_type.eq(asset_type))
.first::<Self>(conn)
.await
}
}
20 changes: 0 additions & 20 deletions rust/processor/src/models/object_models/v2_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ pub struct Object {
pub state_key_hash: String,
pub guid_creation_num: BigDecimal,
pub allow_ungated_transfer: bool,
pub is_token: Option<bool>,
pub is_fungible_asset: Option<bool>,
pub is_deleted: bool,
}

Expand All @@ -49,8 +47,6 @@ pub struct CurrentObject {
pub allow_ungated_transfer: bool,
pub last_guid_creation_num: BigDecimal,
pub last_transaction_version: i64,
pub is_token: Option<bool>,
pub is_fungible_asset: Option<bool>,
pub is_deleted: bool,
}

Expand All @@ -66,8 +62,6 @@ pub struct CurrentObjectQuery {
pub last_transaction_version: i64,
pub is_deleted: bool,
pub inserted_at: chrono::NaiveDateTime,
pub is_token: Option<bool>,
pub is_fungible_asset: Option<bool>,
}

impl Object {
Expand All @@ -91,10 +85,6 @@ impl Object {
state_key_hash: object_with_metadata.state_key_hash.clone(),
guid_creation_num: object_core.guid_creation_num.clone(),
allow_ungated_transfer: object_core.allow_ungated_transfer,
is_token: Some(object_aggregated_metadata.token.is_some()),
is_fungible_asset: Some(
object_aggregated_metadata.fungible_asset_metadata.is_some(),
),
is_deleted: false,
},
CurrentObject {
Expand All @@ -104,10 +94,6 @@ impl Object {
allow_ungated_transfer: object_core.allow_ungated_transfer,
last_guid_creation_num: object_core.guid_creation_num.clone(),
last_transaction_version: txn_version,
is_token: Some(object_aggregated_metadata.token.is_some()),
is_fungible_asset: Some(
object_aggregated_metadata.fungible_asset_metadata.is_some(),
),
is_deleted: false,
},
)))
Expand Down Expand Up @@ -158,8 +144,6 @@ impl Object {
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_token: previous_object.is_token,
is_fungible_asset: previous_object.is_fungible_asset,
is_deleted: true,
},
CurrentObject {
Expand All @@ -169,8 +153,6 @@ impl Object {
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_token: previous_object.is_token,
is_fungible_asset: previous_object.is_fungible_asset,
is_deleted: true,
},
)))
Expand Down Expand Up @@ -198,8 +180,6 @@ impl Object {
allow_ungated_transfer: res.allow_ungated_transfer,
last_guid_creation_num: res.last_guid_creation_num,
last_transaction_version: res.last_transaction_version,
is_token: res.is_token,
is_fungible_asset: res.is_fungible_asset,
is_deleted: res.is_deleted,
});
},
Expand Down
114 changes: 96 additions & 18 deletions rust/processor/src/models/token_v2_models/v2_token_datas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@
use super::v2_token_utils::{TokenStandard, TokenV2};
use crate::{
models::{
object_models::{v2_object_utils::ObjectAggregatedDataMapping, v2_objects::Object},
token_models::token_utils::TokenWriteSet,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_models::{
collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS},
token_utils::TokenWriteSet,
},
},
schema::{current_token_datas_v2, token_datas_v2},
utils::{database::PgPoolConnection, util::standardize_address},
};
use aptos_protos::transaction::v1::{WriteResource, WriteTableItem};
use bigdecimal::{BigDecimal, Zero};
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -63,6 +68,27 @@ pub struct CurrentTokenDataV2 {
pub decimals: i64,
}

#[derive(Debug, Deserialize, Identifiable, Queryable, Serialize)]
#[diesel(primary_key(token_data_id))]
#[diesel(table_name = current_token_datas_v2)]
pub struct CurrentTokenDataV2Query {
pub token_data_id: String,
pub collection_id: String,
pub token_name: String,
pub maximum: Option<BigDecimal>,
pub supply: BigDecimal,
pub largest_property_version_v1: Option<BigDecimal>,
pub token_uri: String,
pub description: String,
pub token_properties: serde_json::Value,
pub token_standard: String,
pub is_fungible_v2: Option<bool>,
pub last_transaction_version: i64,
pub last_transaction_timestamp: chrono::NaiveDateTime,
pub inserted_at: chrono::NaiveDateTime,
pub decimals: i64,
}

impl TokenDataV2 {
// TODO: remove the useless_asref lint when new clippy nighly is released.
#[allow(clippy::useless_asref)]
Expand Down Expand Up @@ -235,40 +261,92 @@ impl TokenDataV2 {
/// 2. If metadata is not present, we will do a lookup in the db.
pub async fn is_address_fungible_token(
conn: &mut PgPoolConnection<'_>,
address: &str,
token_data_id: &str,
object_aggregated_data_mapping: &ObjectAggregatedDataMapping,
txn_version: i64,
) -> bool {
// 1. If metadata is present, the object is a token iff token struct is also present in the object
if let Some(object_data) = object_aggregated_data_mapping.get(address) {
if let Some(object_data) = object_aggregated_data_mapping.get(token_data_id) {
if object_data.fungible_asset_metadata.is_some() {
return object_data.token.is_some();
}
}
// 2. If metadata is not present, we will do a lookup in the db.
// The object must exist in current_objects table for this processor to proceed
match Object::get_current_object(conn, address, txn_version).await {
Ok(object) => {
if let Some(is_token) = object.is_token {
return is_token;
match CurrentTokenDataV2::get_current_token_data_v2(conn, txn_version, token_data_id).await
{
Ok(token_data) => {
if let Some(is_fungible_v2) = token_data.is_fungible_v2 {
return is_fungible_v2;
}
tracing::error!(
"is_token is null for object_address: {}. You should probably backfill db.",
address
);
// By default, don't index
// If is_fungible_v2 is null, that's likely because it's a v1 token, which are not fungible
false
},
Err(_) => {
tracing::error!(
transaction_version = txn_version,
lookup_key = address,
"Missing current_object for object_address: {}. You probably should backfill db.",
address,
lookup_key = token_data_id,
"Missing current_token_data_v2 for token_data_id: {}. You probably should backfill db.",
token_data_id,
);
// By default, don't index
// Default
false
},
}
}
}

impl CurrentTokenDataV2 {
pub async fn get_current_token_data_v2(
conn: &mut PgPoolConnection<'_>,
txn_version: i64,
token_data_id: &str,
) -> anyhow::Result<Self> {
let mut retries = 0;
while retries < QUERY_RETRIES {
retries += 1;
match CurrentTokenDataV2Query::get_by_token_data_id(conn, token_data_id).await {
Ok(res) => {
return Ok(CurrentTokenDataV2 {
token_data_id: res.token_data_id,
collection_id: res.collection_id,
token_name: res.token_name,
maximum: res.maximum,
supply: res.supply,
largest_property_version_v1: res.largest_property_version_v1,
token_uri: res.token_uri,
token_properties: res.token_properties,
description: res.description,
token_standard: res.token_standard,
is_fungible_v2: res.is_fungible_v2,
last_transaction_version: res.last_transaction_version,
last_transaction_timestamp: res.last_transaction_timestamp,
decimals: res.decimals,
});
},
Err(_) => {
tracing::error!(
transaction_version = txn_version,
lookup_key = token_data_id,
"Missing current_token_data_v2 for token_data_id: {}. You probably should backfill db.",
token_data_id,
);
tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS))
.await;
},
}
}
Err(anyhow::anyhow!("Failed to get token data"))
}
}

impl CurrentTokenDataV2Query {
pub async fn get_by_token_data_id(
conn: &mut PgPoolConnection<'_>,
token_data_id: &str,
) -> diesel::QueryResult<Self> {
current_token_datas_v2::table
.filter(current_token_datas_v2::token_data_id.eq(token_data_id))
.first::<Self>(conn)
.await
}
}
Loading

0 comments on commit b47c7fe

Please sign in to comment.