From f6c39c241982e34e0a1fb3839dab265df0c853f0 Mon Sep 17 00:00:00 2001 From: Aaron Date: Fri, 31 May 2024 15:01:18 -0400 Subject: [PATCH] [index] current unified balances (#366) * [index] concurrent unified balances * fixup! [index] concurrent unified balances --- rust/Cargo.lock | 17 ++ rust/Cargo.toml | 2 + rust/processor/Cargo.toml | 2 + .../down.sql | 4 + .../up.sql | 21 +++ .../src/models/ans_models/ans_utils.rs | 2 +- .../src/models/coin_models/coin_activities.rs | 6 +- .../src/models/coin_models/coin_supply.rs | 3 +- .../src/models/coin_models/coin_utils.rs | 38 +++- .../models/default_models/move_resources.rs | 14 +- .../v2_fungible_asset_balances.rs | 173 ++++++++++++++++-- .../v2_fungible_asset_utils.rs | 6 +- .../v2_fungible_metadata.rs | 45 ++++- .../models/object_models/v2_object_utils.rs | 2 +- .../src/models/stake_models/stake_utils.rs | 4 +- .../src/models/token_models/tokens.rs | 2 +- .../models/token_v2_models/v2_collections.rs | 2 +- .../models/token_v2_models/v2_token_utils.rs | 19 +- .../src/processors/coin_processor.rs | 2 - .../processors/fungible_asset_processor.rs | 120 +++++++++++- rust/processor/src/schema.rs | 26 +++ rust/processor/src/utils/util.rs | 30 ++- 22 files changed, 484 insertions(+), 56 deletions(-) create mode 100644 rust/processor/migrations/2024-05-04-025823_current_unified_fungible_asset_balance/down.sql create mode 100644 rust/processor/migrations/2024-05-04-025823_current_unified_fungible_asset_balance/up.sql diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 402dab961..acb7e643c 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -594,6 +594,12 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -2208,6 +2214,7 @@ dependencies = [ "itertools 0.12.1", "jemallocator", "kanal", + "lazy_static", "native-tls", "num_cpus", "once_cell", @@ -2222,6 +2229,7 @@ dependencies = [ "sha2 0.9.9", "sha3", "strum", + "tiny-keccak", "tokio", "tokio-postgres", "tonic 0.11.0", @@ -3205,6 +3213,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.6.0" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 3f3d86076..2f984cd89 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -61,6 +61,7 @@ google-cloud-googleapis = "0.10.0" google-cloud-pubsub = "0.18.0" hex = "0.4.3" itertools = "0.12.1" +lazy_static = "1.4.0" jemallocator = { version = "0.5.0", features = [ "profiling", "unprefixed_malloc_on_supported_platforms", @@ -88,6 +89,7 @@ strum = { version = "0.24.1", features = ["derive"] } tempfile = "3.3.0" toml = "0.7.4" tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] } +tiny-keccak = { version = "2.0.2", features = ["keccak", "sha3"] } tokio = { version = "1.35.1", features = ["full"] } tonic = { version = "0.11.0", features = [ "tls", diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 36853359e..96628d6b3 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -37,6 +37,7 @@ google-cloud-pubsub = { workspace = true } hex = { workspace = true } itertools = { workspace = true } kanal = { workspace = true } +lazy_static = { workspace = true } num_cpus = { workspace = true } once_cell = { workspace = true } prometheus = { workspace = true } @@ -58,6 +59,7 @@ url = { workspace = true } # Postgres SSL support native-tls = { workspace = true } postgres-native-tls = { workspace = true } +tiny-keccak = { workspace = true } tokio-postgres = { workspace = true } [target.'cfg(unix)'.dependencies] diff --git a/rust/processor/migrations/2024-05-04-025823_current_unified_fungible_asset_balance/down.sql b/rust/processor/migrations/2024-05-04-025823_current_unified_fungible_asset_balance/down.sql new file mode 100644 index 000000000..425c1d6da --- /dev/null +++ b/rust/processor/migrations/2024-05-04-025823_current_unified_fungible_asset_balance/down.sql @@ -0,0 +1,4 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS current_unified_fungible_asset_balances; +DROP INDEX IF EXISTS cufab_owner_at_index; +DROP INDEX IF EXISTS cufab_insat_index; diff --git a/rust/processor/migrations/2024-05-04-025823_current_unified_fungible_asset_balance/up.sql b/rust/processor/migrations/2024-05-04-025823_current_unified_fungible_asset_balance/up.sql new file mode 100644 index 000000000..073cf8014 --- /dev/null +++ b/rust/processor/migrations/2024-05-04-025823_current_unified_fungible_asset_balance/up.sql @@ -0,0 +1,21 @@ +-- current fungible asset balances +CREATE TABLE IF NOT EXISTS current_unified_fungible_asset_balances ( + storage_id VARCHAR(66) PRIMARY KEY NOT NULL, + owner_address VARCHAR(66) NOT NULL, + asset_type VARCHAR(66) NOT NULL, + coin_type VARCHAR(1000), + is_primary BOOLEAN, + is_frozen BOOLEAN NOT NULL, + amount_v1 NUMERIC, + amount_v2 NUMERIC, + amount NUMERIC GENERATED ALWAYS AS (COALESCE(amount_v1, 0) + COALESCE(amount_v2, 0)) STORED, + last_transaction_version_v1 BIGINT, + last_transaction_version_v2 BIGINT, + last_transaction_version BIGINT GENERATED ALWAYS AS (GREATEST(last_transaction_version_v1, last_transaction_version_v2)) STORED, + last_transaction_timestamp_v1 TIMESTAMP, + last_transaction_timestamp_v2 TIMESTAMP, + last_transaction_timestamp TIMESTAMP GENERATED ALWAYS AS (GREATEST(last_transaction_timestamp_v1, last_transaction_timestamp_v2)) STORED, + inserted_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS cufab_owner_at_index ON current_unified_fungible_asset_balances (owner_address, asset_type); +CREATE INDEX IF NOT EXISTS cufab_insat_index ON current_unified_fungible_asset_balances (inserted_at); diff --git a/rust/processor/src/models/ans_models/ans_utils.rs b/rust/processor/src/models/ans_models/ans_utils.rs index 4562d79c1..b607e78d8 100644 --- a/rust/processor/src/models/ans_models/ans_utils.rs +++ b/rust/processor/src/models/ans_models/ans_utils.rs @@ -220,7 +220,7 @@ impl AnsWriteResource { ans_v2_contract_address: &str, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); let data = write_resource.data.as_str(); match type_str.clone() { diff --git a/rust/processor/src/models/coin_models/coin_activities.rs b/rust/processor/src/models/coin_models/coin_activities.rs index 86e750117..ab4b53990 100644 --- a/rust/processor/src/models/coin_models/coin_activities.rs +++ b/rust/processor/src/models/coin_models/coin_activities.rs @@ -22,11 +22,13 @@ use crate::{ }, user_transactions_models::signatures::Signature, }, - processors::coin_processor::APTOS_COIN_TYPE_STR, schema::coin_activities, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - util::{get_entry_function_from_user_request, standardize_address, u64_to_bigdecimal}, + util::{ + get_entry_function_from_user_request, standardize_address, u64_to_bigdecimal, + APTOS_COIN_TYPE_STR, + }, }, }; use ahash::AHashMap; diff --git a/rust/processor/src/models/coin_models/coin_supply.rs b/rust/processor/src/models/coin_models/coin_supply.rs index a6b4503af..e6fd91b45 100644 --- a/rust/processor/src/models/coin_models/coin_supply.rs +++ b/rust/processor/src/models/coin_models/coin_supply.rs @@ -7,7 +7,8 @@ use crate::{ models::default_models::move_tables::TableItem, - processors::coin_processor::APTOS_COIN_TYPE_STR, schema::coin_supply, utils::util::hash_str, + schema::coin_supply, + utils::util::{hash_str, APTOS_COIN_TYPE_STR}, }; use anyhow::Context; use aptos_protos::transaction::v1::WriteTableItem; diff --git a/rust/processor/src/models/coin_models/coin_utils.rs b/rust/processor/src/models/coin_models/coin_utils.rs index 40ab07bdc..9478f0eca 100644 --- a/rust/processor/src/models/coin_models/coin_utils.rs +++ b/rust/processor/src/models/coin_models/coin_utils.rs @@ -8,8 +8,8 @@ use crate::{ models::default_models::move_resources::MoveResource, utils::util::{deserialize_from_string, hash_str, standardize_address, truncate_str}, }; -use anyhow::{Context, Result}; -use aptos_protos::transaction::v1::{move_type::Content, MoveType, WriteResource}; +use anyhow::{bail, Context, Result}; +use aptos_protos::transaction::v1::{move_type::Content, DeleteResource, MoveType, WriteResource}; use bigdecimal::BigDecimal; use once_cell::sync::Lazy; use regex::Regex; @@ -229,6 +229,8 @@ impl CoinInfoType { pub enum CoinResource { CoinInfoResource(CoinInfoResource), CoinStoreResource(CoinStoreResource), + CoinInfoDeletion, + CoinStoreDeletion, } impl CoinResource { @@ -262,11 +264,27 @@ impl CoinResource { )) } + fn from_delete_resource_internal(data_type: &str, txn_version: i64) -> Result { + match data_type { + x if x == format!("{}::coin::CoinInfo", COIN_ADDR) => { + Ok(CoinResource::CoinInfoDeletion) + }, + x if x == format!("{}::coin::CoinStore", COIN_ADDR) => { + Ok(CoinResource::CoinStoreDeletion) + }, + _ => bail!( + "Resource unsupported! Call is_resource_supported first. version {} type {}", + txn_version, + data_type + ), + } + } + pub fn from_write_resource( write_resource: &WriteResource, txn_version: i64, ) -> Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !CoinResource::is_resource_supported(type_str.as_str()) { return Ok(None); } @@ -282,6 +300,20 @@ impl CoinResource { txn_version, )?)) } + + pub fn from_delete_resource( + delete_resource: &DeleteResource, + txn_version: i64, + ) -> Result> { + let type_str = MoveResource::get_outer_type_from_delete_resource(delete_resource); + if !CoinResource::is_resource_supported(type_str.as_str()) { + return Ok(None); + } + Ok(Some(Self::from_delete_resource_internal( + &type_str, + txn_version, + )?)) + } } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/rust/processor/src/models/default_models/move_resources.rs b/rust/processor/src/models/default_models/move_resources.rs index b588aba27..ac73336bd 100644 --- a/rust/processor/src/models/default_models/move_resources.rs +++ b/rust/processor/src/models/default_models/move_resources.rs @@ -116,7 +116,7 @@ impl MoveResource { } } - pub fn get_outer_type_from_resource(write_resource: &WriteResource) -> String { + pub fn get_outer_type_from_write_resource(write_resource: &WriteResource) -> String { let move_struct_tag = Self::convert_move_struct_tag(write_resource.r#type.as_ref().unwrap()); @@ -127,6 +127,18 @@ impl MoveResource { move_struct_tag.name, ) } + + pub fn get_outer_type_from_delete_resource(delete_resource: &DeleteResource) -> String { + let move_struct_tag = + Self::convert_move_struct_tag(delete_resource.r#type.as_ref().unwrap()); + + format!( + "{}::{}::{}", + move_struct_tag.get_address(), + move_struct_tag.module, + move_struct_tag.name, + ) + } } impl MoveStructTag { diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_balances.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_balances.rs index dce861672..adf510b30 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_balances.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_balances.rs @@ -12,19 +12,23 @@ use crate::{ models::{ coin_models::coin_utils::{CoinInfoType, CoinResource}, object_models::v2_object_utils::ObjectAggregatedDataMapping, - token_v2_models::v2_token_utils::TokenStandard, + token_v2_models::v2_token_utils::{TokenStandard, V2_STANDARD}, + }, + schema::{ + current_fungible_asset_balances, current_unified_fungible_asset_balances, + fungible_asset_balances, + }, + utils::util::{ + hex_to_raw_bytes, sha3_256, standardize_address, APTOS_COIN_TYPE_STR, + APT_METADATA_ADDRESS_HEX, APT_METADATA_ADDRESS_RAW, }, - schema::{current_fungible_asset_balances, fungible_asset_balances}, - utils::util::standardize_address, }; use ahash::AHashMap; -use aptos_protos::transaction::v1::WriteResource; -use bigdecimal::BigDecimal; +use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; +use bigdecimal::{BigDecimal, Zero}; use field_count::FieldCount; -use hex::FromHex; use serde::{Deserialize, Serialize}; -use sha2::Digest; -use sha3::Sha3_256; +use std::borrow::Borrow; // Storage id pub type CurrentFungibleAssetBalancePK = String; @@ -62,6 +66,86 @@ pub struct CurrentFungibleAssetBalance { pub token_standard: String, } +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize, Default)] +#[diesel(primary_key(storage_id))] +#[diesel(table_name = current_unified_fungible_asset_balances)] +#[diesel(treat_none_as_null = true)] +pub struct CurrentUnifiedFungibleAssetBalance { + pub storage_id: String, + pub owner_address: String, + // metadata address for (paired) Fungible Asset + pub asset_type: String, + pub coin_type: Option, + pub is_primary: Option, + pub is_frozen: bool, + pub amount_v1: Option, + pub amount_v2: Option, + pub last_transaction_version_v1: Option, + pub last_transaction_version_v2: Option, + pub last_transaction_timestamp_v1: Option, + pub last_transaction_timestamp_v2: Option, +} + +fn get_paired_metadata_address(coin_type_name: &str) -> String { + if coin_type_name == APTOS_COIN_TYPE_STR { + APT_METADATA_ADDRESS_HEX.clone() + } else { + let mut preimage = APT_METADATA_ADDRESS_RAW.to_vec(); + preimage.extend(coin_type_name.as_bytes()); + preimage.push(0xFE); + format!("0x{}", hex::encode(sha3_256(&preimage))) + } +} + +fn get_primary_fungible_store_address( + owner_address: &str, + metadata_address: &str, +) -> anyhow::Result { + let mut preimage = hex_to_raw_bytes(owner_address)?; + preimage.append(&mut hex_to_raw_bytes(metadata_address)?); + preimage.push(0xFC); + Ok(standardize_address(&hex::encode(sha3_256(&preimage)))) +} + +impl From<&CurrentFungibleAssetBalance> for CurrentUnifiedFungibleAssetBalance { + fn from(cfab: &CurrentFungibleAssetBalance) -> Self { + if cfab.token_standard.as_str() == V2_STANDARD.borrow().as_str() { + Self { + storage_id: cfab.storage_id.clone(), + owner_address: cfab.owner_address.clone(), + asset_type: cfab.asset_type.clone(), + coin_type: None, + is_primary: Some(cfab.is_primary), + is_frozen: cfab.is_frozen, + amount_v1: None, + amount_v2: Some(cfab.amount.clone()), + last_transaction_version_v1: None, + last_transaction_version_v2: Some(cfab.last_transaction_version), + last_transaction_timestamp_v1: None, + last_transaction_timestamp_v2: Some(cfab.last_transaction_timestamp), + } + } else { + let metadata_addr = get_paired_metadata_address(&cfab.asset_type); + let pfs_addr = get_primary_fungible_store_address(&cfab.owner_address, &metadata_addr) + .expect("calculate pfs_address failed"); + Self { + storage_id: pfs_addr, + owner_address: cfab.owner_address.clone(), + asset_type: metadata_addr, + coin_type: Some(cfab.asset_type.clone()), + is_primary: None, + is_frozen: cfab.is_frozen, + amount_v1: Some(cfab.amount.clone()), + amount_v2: None, + last_transaction_version_v1: Some(cfab.last_transaction_version), + last_transaction_version_v2: None, + last_transaction_timestamp_v1: Some(cfab.last_transaction_timestamp), + last_transaction_timestamp_v2: None, + } + } + } +} + impl FungibleAssetBalance { /// Basically just need to index FA Store, but we'll need to look up FA metadata pub async fn get_v2_from_write_resource( @@ -111,6 +195,57 @@ impl FungibleAssetBalance { Ok(None) } + pub fn get_v1_from_delete_resource( + delete_resource: &DeleteResource, + write_set_change_index: i64, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + if let Some(CoinResource::CoinStoreDeletion) = + &CoinResource::from_delete_resource(delete_resource, txn_version)? + { + let coin_info_type = &CoinInfoType::from_move_type( + &delete_resource.r#type.as_ref().unwrap().generic_type_params[0], + delete_resource.type_str.as_ref(), + txn_version, + ); + if let Some(coin_type) = coin_info_type.get_coin_type_below_max() { + let owner_address = standardize_address(delete_resource.address.as_str()); + let storage_id = + CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str()); + let coin_balance = Self { + transaction_version: txn_version, + write_set_change_index, + storage_id: storage_id.clone(), + owner_address: owner_address.clone(), + asset_type: coin_type.clone(), + is_primary: true, + is_frozen: false, + amount: BigDecimal::zero(), + transaction_timestamp: txn_timestamp, + token_standard: TokenStandard::V1.to_string(), + }; + let current_coin_balance = CurrentFungibleAssetBalance { + storage_id, + owner_address, + asset_type: coin_type.clone(), + is_primary: true, + is_frozen: false, + amount: BigDecimal::zero(), + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + token_standard: TokenStandard::V1.to_string(), + }; + return Ok(Some(( + coin_balance, + current_coin_balance, + AHashMap::default(), + ))); + } + } + Ok(None) + } + /// Getting coin balances from resources for v1 /// If the fully qualified coin type is too long (currently 1000 length), we exclude from indexing pub fn get_v1_from_write_resource( @@ -178,17 +313,8 @@ impl FungibleAssetBalance { metadata_address: &str, fungible_store_address: &str, ) -> bool { - let owner_address_bytes = <[u8; 32]>::from_hex(&owner_address[2..]).unwrap(); - let metadata_address_bytes = <[u8; 32]>::from_hex(&metadata_address[2..]).unwrap(); - - // construct the expected metadata address - let mut hasher = Sha3_256::new(); - hasher.update(owner_address_bytes); - hasher.update(metadata_address_bytes); - hasher.update([0xFC]); - let hash_result = hasher.finalize(); - // compare address to actual metadata address - hex::encode(hash_result) == fungible_store_address[2..] + fungible_store_address + == get_primary_fungible_store_address(owner_address, metadata_address).unwrap() } } @@ -236,4 +362,13 @@ mod tests { fungible_store_address, )); } + + #[test] + fn test_paired_metadata_address() { + assert_eq!( + get_paired_metadata_address("0x1::aptos_coin::AptosCoin"), + *APT_METADATA_ADDRESS_HEX + ); + assert_eq!(get_paired_metadata_address("0x66c34778730acbb120cefa57a3d98fd21e0c8b3a51e9baee530088b2e444e94c::moon_coin::MoonCoin"), "0xf772c28c069aa7e4417d85d771957eb3c5c11b5bf90b1965cda23b899ebc0384"); + } } diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs index da43ca2c0..102dfe7c6 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs @@ -59,7 +59,7 @@ impl FungibleAssetMetadata { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2FungibleAssetResource::is_resource_supported(type_str.as_str()) { return Ok(None); } @@ -113,7 +113,7 @@ impl FungibleAssetStore { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2FungibleAssetResource::is_resource_supported(type_str.as_str()) { return Ok(None); } @@ -158,7 +158,7 @@ impl FungibleAssetSupply { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str: String = MoveResource::get_outer_type_from_resource(write_resource); + let type_str: String = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2FungibleAssetResource::is_resource_supported(type_str.as_str()) { return Ok(None); } diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs index 1acb60f76..97622391a 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs @@ -16,7 +16,7 @@ use crate::{ utils::util::standardize_address, }; use ahash::AHashMap; -use aptos_protos::transaction::v1::WriteResource; +use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; use bigdecimal::BigDecimal; use diesel::prelude::*; use field_count::FieldCount; @@ -139,4 +139,47 @@ impl FungibleAssetMetadataModel { _ => Ok(None), } } + + pub fn get_v1_from_delete_resource( + delete_resource: &DeleteResource, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + match &CoinResource::from_delete_resource(delete_resource, txn_version)? { + Some(CoinResource::CoinInfoResource(inner)) => { + let coin_info_type = &CoinInfoType::from_move_type( + &delete_resource.r#type.as_ref().unwrap().generic_type_params[0], + delete_resource.type_str.as_ref(), + txn_version, + ); + let (supply_aggregator_table_handle, supply_aggregator_table_key) = inner + .get_aggregator_metadata() + .map(|agg| (Some(agg.handle), Some(agg.key))) + .unwrap_or((None, None)); + // If asset type is too long, just ignore + if let Some(asset_type) = coin_info_type.get_coin_type_below_max() { + Ok(Some(Self { + asset_type, + creator_address: coin_info_type.get_creator_address(), + name: inner.get_name_trunc(), + symbol: inner.get_symbol_trunc(), + decimals: inner.decimals, + icon_uri: None, + project_uri: None, + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + supply_aggregator_table_handle_v1: supply_aggregator_table_handle, + supply_aggregator_table_key_v1: supply_aggregator_table_key, + token_standard: TokenStandard::V1.to_string(), + is_token_v2: None, + supply_v2: None, + maximum_v2: None, + })) + } else { + Ok(None) + } + }, + _ => Ok(None), + } + } } diff --git a/rust/processor/src/models/object_models/v2_object_utils.rs b/rust/processor/src/models/object_models/v2_object_utils.rs index 95935f833..548e56f70 100644 --- a/rust/processor/src/models/object_models/v2_object_utils.rs +++ b/rust/processor/src/models/object_models/v2_object_utils.rs @@ -103,7 +103,7 @@ impl ObjectWithMetadata { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2TokenResource::is_resource_supported(type_str.as_str()) { return Ok(None); } diff --git a/rust/processor/src/models/stake_models/stake_utils.rs b/rust/processor/src/models/stake_models/stake_utils.rs index 40cf75fc2..6c91364d6 100644 --- a/rust/processor/src/models/stake_models/stake_utils.rs +++ b/rust/processor/src/models/stake_models/stake_utils.rs @@ -167,7 +167,7 @@ impl StakeResource { write_resource: &WriteResource, txn_version: i64, ) -> Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !Self::is_resource_supported(type_str.as_str()) { return Ok(None); } @@ -323,7 +323,7 @@ impl DelegationVoteGovernanceRecordsResource { write_resource: &WriteResource, txn_version: i64, ) -> Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); let resource = MoveResource::from_write_resource( write_resource, 0, // Placeholder, this isn't used anyway diff --git a/rust/processor/src/models/token_models/tokens.rs b/rust/processor/src/models/token_models/tokens.rs index 2f244e9b9..a6703ea3e 100644 --- a/rust/processor/src/models/token_models/tokens.rs +++ b/rust/processor/src/models/token_models/tokens.rs @@ -419,7 +419,7 @@ impl TableMetadataForToken { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !TokenResource::is_resource_supported(type_str.as_str()) { return Ok(None); } diff --git a/rust/processor/src/models/token_v2_models/v2_collections.rs b/rust/processor/src/models/token_v2_models/v2_collections.rs index f40f4551f..66eabcc2a 100644 --- a/rust/processor/src/models/token_v2_models/v2_collections.rs +++ b/rust/processor/src/models/token_v2_models/v2_collections.rs @@ -85,7 +85,7 @@ impl CollectionV2 { txn_timestamp: chrono::NaiveDateTime, object_metadatas: &ObjectAggregatedDataMapping, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2TokenResource::is_resource_supported(type_str.as_str()) { return Ok(None); } diff --git a/rust/processor/src/models/token_v2_models/v2_token_utils.rs b/rust/processor/src/models/token_v2_models/v2_token_utils.rs index 2b6f25243..88f68f140 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_utils.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_utils.rs @@ -21,6 +21,7 @@ use ahash::{AHashMap, AHashSet}; use anyhow::{Context, Result}; use aptos_protos::transaction::v1::{Event, WriteResource}; use bigdecimal::BigDecimal; +use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use std::fmt::{self, Formatter}; @@ -29,6 +30,10 @@ pub const TOKEN_V2_ADDR: &str = pub const DEFAULT_OWNER_ADDRESS: &str = "unknown"; +lazy_static! { + pub static ref V2_STANDARD: String = TokenStandard::V2.to_string(); +} + /// Tracks all token related data in a hashmap for quick access (keyed on address of the object core) /// Maps address to burn event. If it's an old event previous_owner will be empty pub type TokenV2Burned = AHashMap; @@ -87,7 +92,7 @@ impl AptosCollection { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2TokenResource::is_resource_supported(type_str.as_str()) { return Ok(None); } @@ -134,7 +139,7 @@ impl TokenV2 { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2TokenResource::is_resource_supported(type_str.as_str()) { return Ok(None); } @@ -191,7 +196,7 @@ impl FixedSupply { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2TokenResource::is_resource_supported(type_str.as_str()) { return Ok(None); } @@ -225,7 +230,7 @@ impl UnlimitedSupply { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2TokenResource::is_resource_supported(type_str.as_str()) { return Ok(None); } @@ -257,7 +262,7 @@ impl ConcurrentSupply { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2TokenResource::is_resource_supported(type_str.as_str()) { return Ok(None); } @@ -431,7 +436,7 @@ impl PropertyMapModel { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2TokenResource::is_resource_supported(type_str.as_str()) { return Ok(None); } @@ -462,7 +467,7 @@ impl TokenIdentifiers { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); + let type_str = MoveResource::get_outer_type_from_write_resource(write_resource); if !V2TokenResource::is_resource_supported(type_str.as_str()) { return Ok(None); } diff --git a/rust/processor/src/processors/coin_processor.rs b/rust/processor/src/processors/coin_processor.rs index 6a08a9de3..77117fa2b 100644 --- a/rust/processor/src/processors/coin_processor.rs +++ b/rust/processor/src/processors/coin_processor.rs @@ -27,8 +27,6 @@ use diesel::{ use std::fmt::Debug; use tracing::error; -pub const APTOS_COIN_TYPE_STR: &str = "0x1::aptos_coin::AptosCoin"; - pub struct CoinProcessor { connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap, diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index c5f303b17..38636ccb4 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -7,7 +7,8 @@ use crate::{ fungible_asset_models::{ v2_fungible_asset_activities::{EventToCoinType, FungibleAssetActivity}, v2_fungible_asset_balances::{ - CurrentFungibleAssetBalance, CurrentFungibleAssetMapping, FungibleAssetBalance, + CurrentFungibleAssetBalance, CurrentFungibleAssetMapping, + CurrentUnifiedFungibleAssetBalance, FungibleAssetBalance, }, v2_fungible_asset_utils::{ FeeStatement, FungibleAssetMetadata, FungibleAssetStore, FungibleAssetSupply, @@ -38,8 +39,6 @@ use diesel::{ use std::fmt::Debug; use tracing::error; -pub const APTOS_COIN_TYPE_STR: &str = "0x1::aptos_coin::AptosCoin"; - pub struct FungibleAssetProcessor { connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap, @@ -74,6 +73,10 @@ async fn insert_to_db( fungible_asset_metadata: &[FungibleAssetMetadataModel], fungible_asset_balances: &[FungibleAssetBalance], current_fungible_asset_balances: &[CurrentFungibleAssetBalance], + current_unified_fungible_asset_balances: ( + &[CurrentUnifiedFungibleAssetBalance], + &[CurrentUnifiedFungibleAssetBalance], + ), per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( @@ -111,7 +114,7 @@ async fn insert_to_db( ), ); let cfab = execute_in_chunks( - conn, + conn.clone(), insert_current_fungible_asset_balances_query, current_fungible_asset_balances, get_config_table_chunk_size::( @@ -119,8 +122,28 @@ async fn insert_to_db( per_table_chunk_sizes, ), ); - let (faa_res, fam_res, fab_res, cfab_res) = tokio::join!(faa, fam, fab, cfab); - for res in [faa_res, fam_res, fab_res, cfab_res] { + + let cufab_v1 = execute_in_chunks( + conn.clone(), + insert_current_unified_fungible_asset_balances_v1_query, + current_unified_fungible_asset_balances.0, + get_config_table_chunk_size::( + "current_united_fungible_asset_balances", + per_table_chunk_sizes, + ), + ); + let cufab_v2 = execute_in_chunks( + conn, + insert_current_unified_fungible_asset_balances_v2_query, + current_unified_fungible_asset_balances.1, + get_config_table_chunk_size::( + "current_united_fungible_asset_balances", + per_table_chunk_sizes, + ), + ); + let (faa_res, fam_res, fab_res, cfab_res, cufab1_res, cufab2_res) = + tokio::join!(faa, fam, fab, cfab, cufab_v1, cufab_v2); + for res in [faa_res, fam_res, fab_res, cfab_res, cufab1_res, cufab2_res] { res?; } @@ -227,6 +250,63 @@ fn insert_current_fungible_asset_balances_query( ) } +fn insert_current_unified_fungible_asset_balances_v1_query( + items_to_insert: Vec, +) -> ( + impl QueryFragment + diesel::query_builder::QueryId + Send, + Option<&'static str>, +) { + use schema::current_unified_fungible_asset_balances::dsl::*; + + ( + diesel::insert_into(schema::current_unified_fungible_asset_balances::table) + .values(items_to_insert) + .on_conflict(storage_id) + .do_update() + .set( + ( + owner_address.eq(excluded(owner_address)), + asset_type.eq(excluded(asset_type)), + coin_type.eq(excluded(coin_type)), + is_frozen.eq(excluded(is_frozen)), + amount_v1.eq(excluded(amount_v1)), + last_transaction_timestamp_v1.eq(excluded(last_transaction_timestamp_v1)), + last_transaction_version_v1.eq(excluded(last_transaction_version_v1)), + inserted_at.eq(excluded(inserted_at)), + ) + ), + Some(" WHERE current_unified_fungible_asset_balances.last_transaction_version_v1 IS NULL OR current_unified_fungible_asset_balances.last_transaction_version_v1 <= excluded.last_transaction_version_v1"), + ) +} + +fn insert_current_unified_fungible_asset_balances_v2_query( + items_to_insert: Vec, +) -> ( + impl QueryFragment + diesel::query_builder::QueryId + Send, + Option<&'static str>, +) { + use schema::current_unified_fungible_asset_balances::dsl::*; + ( + diesel::insert_into(schema::current_unified_fungible_asset_balances::table) + .values(items_to_insert) + .on_conflict(storage_id) + .do_update() + .set( + ( + owner_address.eq(excluded(owner_address)), + asset_type.eq(excluded(asset_type)), + is_primary.eq(excluded(is_primary)), + is_frozen.eq(excluded(is_frozen)), + amount_v2.eq(excluded(amount_v2)), + last_transaction_timestamp_v2.eq(excluded(last_transaction_timestamp_v2)), + last_transaction_version_v2.eq(excluded(last_transaction_version_v2)), + inserted_at.eq(excluded(inserted_at)), + ) + ), + Some(" WHERE current_unified_fungible_asset_balances.last_transaction_version_v2 IS NULL OR current_unified_fungible_asset_balances.last_transaction_version_v2 <= excluded.last_transaction_version_v2 "), + ) +} + #[async_trait] impl ProcessorTrait for FungibleAssetProcessor { fn name(&self) -> &'static str { @@ -248,11 +328,15 @@ impl ProcessorTrait for FungibleAssetProcessor { fungible_asset_metadata, fungible_asset_balances, current_fungible_asset_balances, + current_unified_fungible_asset_balances, ) = parse_v2_coin(&transactions).await; let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); + let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = current_unified_fungible_asset_balances + .into_iter() + .partition(|x| x.is_primary.is_none()); let tx_result = insert_to_db( self.get_pool(), self.name(), @@ -262,6 +346,7 @@ impl ProcessorTrait for FungibleAssetProcessor { &fungible_asset_metadata, &fungible_asset_balances, ¤t_fungible_asset_balances, + (&coin_balance, &fa_balance), &self.per_table_chunk_sizes, ) .await; @@ -300,6 +385,7 @@ async fn parse_v2_coin( Vec, Vec, Vec, + Vec, ) { let mut fungible_asset_activities = vec![]; let mut fungible_asset_balances = vec![]; @@ -410,6 +496,21 @@ async fn parse_v2_coin( aggregated_data.fungible_asset_supply = Some(fungible_asset_supply); } } + } else if let Change::DeleteResource(delete_resource) = wsc.change.as_ref().unwrap() { + if let Some((balance, current_balance, event_to_coin)) = + FungibleAssetBalance::get_v1_from_delete_resource( + delete_resource, + index as i64, + txn_version, + txn_timestamp, + ) + .unwrap() + { + fungible_asset_balances.push(balance); + current_fungible_asset_balances + .insert(current_balance.storage_id.clone(), current_balance.clone()); + event_to_v1_coin_type.extend(event_to_coin); + } } } @@ -542,14 +643,21 @@ async fn parse_v2_coin( let mut current_fungible_asset_balances = current_fungible_asset_balances .into_values() .collect::>(); + // Sort by PK fungible_asset_metadata.sort_by(|a, b| a.asset_type.cmp(&b.asset_type)); current_fungible_asset_balances.sort_by(|a, b| a.storage_id.cmp(&b.storage_id)); + // Process the unified balance + let current_unified_fungible_asset_balances = current_fungible_asset_balances + .iter() + .map(CurrentUnifiedFungibleAssetBalance::from) + .collect::>(); ( fungible_asset_activities, fungible_asset_metadata, fungible_asset_balances, current_fungible_asset_balances, + current_unified_fungible_asset_balances, ) } diff --git a/rust/processor/src/schema.rs b/rust/processor/src/schema.rs index 45fab0e8f..b50808303 100644 --- a/rust/processor/src/schema.rs +++ b/rust/processor/src/schema.rs @@ -634,6 +634,31 @@ diesel::table! { } } +diesel::table! { + current_unified_fungible_asset_balances (storage_id) { + #[max_length = 66] + storage_id -> Varchar, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 66] + asset_type -> Varchar, + #[max_length = 1000] + coin_type -> Nullable, + is_primary -> Nullable, + is_frozen -> Bool, + amount_v1 -> Nullable, + amount_v2 -> Nullable, + amount -> Nullable, + last_transaction_version_v1 -> Nullable, + last_transaction_version_v2 -> Nullable, + last_transaction_version -> Nullable, + last_transaction_timestamp_v1 -> Nullable, + last_transaction_timestamp_v2 -> Nullable, + last_transaction_timestamp -> Nullable, + inserted_at -> Timestamp, + } +} + diesel::table! { delegated_staking_activities (transaction_version, event_index) { transaction_version -> Int8, @@ -1266,6 +1291,7 @@ diesel::allow_tables_to_appear_in_same_query!( current_token_ownerships_v2, current_token_pending_claims, current_token_v2_metadata, + current_unified_fungible_asset_balances, delegated_staking_activities, delegated_staking_pool_balances, delegated_staking_pools, diff --git a/rust/processor/src/utils/util.rs b/rust/processor/src/utils/util.rs index 14f750322..b4f342c57 100644 --- a/rust/processor/src/utils/util.rs +++ b/rust/processor/src/utils/util.rs @@ -15,16 +15,29 @@ use aptos_protos::{ util::timestamp::Timestamp, }; use bigdecimal::{BigDecimal, Signed, ToPrimitive, Zero}; +use lazy_static::lazy_static; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; use sha2::Digest; use std::str::FromStr; +use tiny_keccak::{Hasher, Sha3}; // 9999-12-31 23:59:59, this is the max supported by Google BigQuery pub const MAX_TIMESTAMP_SECS: i64 = 253_402_300_799; // Max length of entry function id string to ensure that db doesn't explode pub const MAX_ENTRY_FUNCTION_LENGTH: usize = 1000; +pub const APTOS_COIN_TYPE_STR: &str = "0x1::aptos_coin::AptosCoin"; + +lazy_static! { + pub static ref APT_METADATA_ADDRESS_RAW: [u8; 32] = { + let mut addr = [0u8; 32]; + addr[31] = 10u8; + addr + }; + pub static ref APT_METADATA_ADDRESS_HEX: String = + format!("0x{}", hex::encode(*APT_METADATA_ADDRESS_RAW)); +} // Supporting structs to get clean payload without escaped strings #[derive(Debug, Deserialize, Serialize)] pub struct EntryFunctionPayloadClean { @@ -65,6 +78,14 @@ pub fn hash_str(val: &str) -> String { hex::encode(sha2::Sha256::digest(val.as_bytes())) } +pub fn sha3_256(buffer: &[u8]) -> [u8; 32] { + let mut output = [0; 32]; + let mut sha3 = Sha3::v256(); + sha3.update(buffer); + sha3.finalize(&mut output); + output +} + pub fn truncate_str(val: &str, max_chars: usize) -> String { let mut trunc = val.to_string(); trunc.truncate(max_chars); @@ -328,7 +349,7 @@ where D: Deserializer<'de>, { let s = ::deserialize(deserializer)?; - Ok(convert_hex(s.clone()).unwrap_or(s)) + Ok(String::from_utf8(hex_to_raw_bytes(&s).unwrap()).unwrap_or(s)) } /// Convert the bcs serialized vector to its original string format @@ -388,10 +409,9 @@ pub fn convert_bcs_token_object_propertymap(s: Value) -> Option { } } -/// Convert the vector that is directly generated from b"xxx" -pub fn convert_hex(val: String) -> Option { - let decoded = hex::decode(val.strip_prefix("0x").unwrap_or(&*val)).ok()?; - String::from_utf8(decoded).ok() +/// Convert from hex string to raw byte string +pub fn hex_to_raw_bytes(val: &str) -> anyhow::Result> { + Ok(hex::decode(val.strip_prefix("0x").unwrap_or(val))?) } /// Deserialize from string to type T