From 9157f6fdd74c6ee92598a36ea0480540766019b2 Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Thu, 13 Jul 2023 00:35:49 -0700 Subject: [PATCH] add indexer grpc, add better logs for storage indexer --- .../src/models/token_models/v2_collections.rs | 2 +- .../token_models/v2_token_activities.rs | 2 +- .../src/models/token_models/v2_token_utils.rs | 4 +- .../indexer-grpc-parser/README.md | 4 +- .../down.sql | 2 + .../up.sql | 29 ++++++ .../indexer-grpc-parser/parser.yaml | 2 +- .../indexer-grpc-parser/src/main.rs | 4 +- .../coin_models/v2_fungible_asset_utils.rs | 96 ++++++++++++++++--- .../src/models/token_models/v2_collections.rs | 8 +- .../token_models/v2_token_activities.rs | 87 ++++++++++++++++- .../src/models/token_models/v2_token_datas.rs | 41 +++++++- .../src/models/token_models/v2_token_utils.rs | 6 +- .../src/processors/token_processor.rs | 26 ++++- .../indexer-grpc-parser/src/worker.rs | 22 ++--- storage/indexer/src/lib.rs | 14 ++- 16 files changed, 306 insertions(+), 43 deletions(-) create mode 100644 ecosystem/indexer-grpc/indexer-grpc-parser/migrations/2023-07-06-042159_minor_optimizations/down.sql create mode 100644 ecosystem/indexer-grpc/indexer-grpc-parser/migrations/2023-07-06-042159_minor_optimizations/up.sql diff --git a/crates/indexer/src/models/token_models/v2_collections.rs b/crates/indexer/src/models/token_models/v2_collections.rs index a9669ee58762e..57396b9b0f62e 100644 --- a/crates/indexer/src/models/token_models/v2_collections.rs +++ b/crates/indexer/src/models/token_models/v2_collections.rs @@ -282,7 +282,7 @@ impl CollectionV2 { } /// TODO: Change this to a KV store - pub fn get_by_table_handle( + fn get_by_table_handle( conn: &mut PgPoolConnection, table_handle: &str, ) -> anyhow::Result { diff --git a/crates/indexer/src/models/token_models/v2_token_activities.rs b/crates/indexer/src/models/token_models/v2_token_activities.rs index 30896d41bcf58..5d7caf2af4951 100644 --- a/crates/indexer/src/models/token_models/v2_token_activities.rs +++ b/crates/indexer/src/models/token_models/v2_token_activities.rs @@ -87,7 +87,7 @@ impl TokenActivityV2 { let fungible_asset = metadata.fungible_asset_store.as_ref().unwrap(); let maybe_token_data_id = fungible_asset.metadata.get_reference_address(); // Now we try to see if the fungible asset is actually a token. If it's not token, return early - let is_token = if let Some(_) = token_v2_metadata.get(&maybe_token_data_id) { + let is_token = if token_v2_metadata.get(&maybe_token_data_id).is_some() { true } else { // Look up in the db diff --git a/crates/indexer/src/models/token_models/v2_token_utils.rs b/crates/indexer/src/models/token_models/v2_token_utils.rs index 80a122aa283a7..aa525d20a5bd4 100644 --- a/crates/indexer/src/models/token_models/v2_token_utils.rs +++ b/crates/indexer/src/models/token_models/v2_token_utils.rs @@ -7,7 +7,9 @@ use super::token_utils::{NAME_LENGTH, URI_LENGTH}; use crate::{ models::{ - coin_models::v2_fungible_asset_utils::{FungibleAssetMetadata, FungibleAssetSupply, FungibleAssetStore}, + coin_models::v2_fungible_asset_utils::{ + FungibleAssetMetadata, FungibleAssetStore, FungibleAssetSupply, + }, move_resources::MoveResource, v2_objects::CurrentObjectPK, }, diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/README.md b/ecosystem/indexer-grpc/indexer-grpc-parser/README.md index 4bee514482f15..b3a06b8c13592 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-parser/README.md +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/README.md @@ -16,7 +16,7 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc server_config: processor_name: default_processor postgres_connection_string: postgresql://postgres:@localhost:5432/postgres_v2 - indexer_grpc_data_service_addresss: 127.0.0.1:50051 + indexer_grpc_data_service_address: 127.0.0.1:50051 indexer_grpc_http2_ping_interval_in_secs: 60 indexer_grpc_http2_ping_timeout_in_secs: 10 auth_token: AUTH_TOKEN @@ -26,7 +26,7 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc * `processor_name`: purpose of this processor; also used for monitoring purpose. * `postgres_connection_string`: PostgresQL DB connection string -* `indexer_grpc_data_service_addresss`: Data service non-TLS endpoint address. +* `indexer_grpc_data_service_address`: Data service non-TLS endpoint address. * `indexer_grpc_http2_ping_interval_in_secs`: client-side grpc HTTP2 ping interval. * `indexer_grpc_http2_ping_timeout_in_secs`: client-side grpc HTTP2 ping timeout. * `auth_token`: Auth token used for connection. diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/migrations/2023-07-06-042159_minor_optimizations/down.sql b/ecosystem/indexer-grpc/indexer-grpc-parser/migrations/2023-07-06-042159_minor_optimizations/down.sql new file mode 100644 index 0000000000000..81190795c2fa0 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/migrations/2023-07-06-042159_minor_optimizations/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP INDEX IF EXISTS mr_ver_index; \ No newline at end of file diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/migrations/2023-07-06-042159_minor_optimizations/up.sql b/ecosystem/indexer-grpc/indexer-grpc-parser/migrations/2023-07-06-042159_minor_optimizations/up.sql new file mode 100644 index 0000000000000..c40d650f08008 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/migrations/2023-07-06-042159_minor_optimizations/up.sql @@ -0,0 +1,29 @@ +-- Your SQL goes here +-- This is needed to improve performance when querying an account with a large number of transactions +CREATE INDEX IF NOT EXISTS mr_ver_index ON move_resources(transaction_version DESC); +-- These are needed b/c for some reason we're getting build errors when setting +-- type field with a length limit +ALTER TABLE signatures +ALTER COLUMN type TYPE VARCHAR; +ALTER TABLE token_activities_v2 +ALTER COLUMN type TYPE VARCHAR; +DROP VIEW IF EXISTS transactions_view; +ALTER TABLE transactions +ALTER COLUMN type TYPE VARCHAR; +CREATE VIEW transactions_view AS +SELECT "version", + block_height, + "hash", + "type", + payload#>>'{}' AS json_payload, + state_change_hash, + event_root_hash, + state_checkpoint_hash, + gas_used, + success, + vm_status, + accumulator_root_hash, + num_events, + num_write_set_changes, + inserted_at +FROM transactions; \ No newline at end of file diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/parser.yaml b/ecosystem/indexer-grpc/indexer-grpc-parser/parser.yaml index 079f073e26af7..1bdb9c6a73736 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-parser/parser.yaml +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/parser.yaml @@ -3,5 +3,5 @@ health_check_port: 8084 server_config: processor_name: default_processor postgres_connection_string: postgresql://postgres:@localhost:5432/postgres_v2 - indexer_grpc_data_service_addresss: 127.0.0.1:50051:50051 + indexer_grpc_data_service_address: 127.0.0.1:50051:50051 auth_token: AUTH_TOKEN \ No newline at end of file diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/src/main.rs b/ecosystem/indexer-grpc/indexer-grpc-parser/src/main.rs index af40fdbde69aa..2171b9dbc538e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-parser/src/main.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/src/main.rs @@ -13,7 +13,7 @@ pub struct IndexerGrpcProcessorConfig { pub processor_name: String, pub postgres_connection_string: String, // TODO: add tls support. - pub indexer_grpc_data_service_addresss: String, + pub indexer_grpc_data_service_address: String, // Indexer GRPC http2 ping interval in seconds; default to 30. // tonic ref: https://docs.rs/tonic/latest/tonic/transport/channel/struct.Endpoint.html#method.http2_keep_alive_interval pub indexer_grpc_http2_ping_interval_in_secs: Option, @@ -33,7 +33,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig { let mut worker = Worker::new( self.processor_name.clone(), self.postgres_connection_string.clone(), - self.indexer_grpc_data_service_addresss.clone(), + self.indexer_grpc_data_service_address.clone(), std::time::Duration::from_secs( self.indexer_grpc_http2_ping_interval_in_secs.unwrap_or(30), ), diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/coin_models/v2_fungible_asset_utils.rs b/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/coin_models/v2_fungible_asset_utils.rs index 1362323006d93..510094b7bac5e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/coin_models/v2_fungible_asset_utils.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/coin_models/v2_fungible_asset_utils.rs @@ -4,6 +4,7 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] +use super::coin_utils::COIN_ADDR; use crate::{ models::{ default_models::move_resources::MoveResource, @@ -83,6 +84,36 @@ pub struct FungibleAssetStore { pub frozen: bool, } +impl FungibleAssetStore { + pub fn from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + ) -> anyhow::Result> { + let type_str = MoveResource::get_outer_type_from_resource(write_resource); + if !V2FungibleAssetResource::is_resource_supported(type_str.as_str()) { + return Ok(None); + } + let resource = MoveResource::from_write_resource( + write_resource, + 0, // Placeholder, this isn't used anyway + txn_version, + 0, // Placeholder, this isn't used anyway + ); + + if let V2FungibleAssetResource::FungibleAssetStore(inner) = + V2FungibleAssetResource::from_resource( + &type_str, + resource.data.as_ref().unwrap(), + txn_version, + )? + { + Ok(Some(inner)) + } else { + Ok(None) + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct FungibleAssetSupply { #[serde(deserialize_with = "deserialize_from_string")] @@ -132,6 +163,18 @@ impl FungibleAssetSupply { } } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DepositEvent { + #[serde(deserialize_with = "deserialize_from_string")] + pub amount: BigDecimal, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct WithdrawEvent { + #[serde(deserialize_with = "deserialize_from_string")] + pub amount: BigDecimal, +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub enum V2FungibleAssetResource { FungibleAssetMetadata(FungibleAssetMetadata), @@ -141,12 +184,12 @@ pub enum V2FungibleAssetResource { impl V2FungibleAssetResource { pub fn is_resource_supported(data_type: &str) -> bool { - matches!( - data_type, - "0x1::fungible_asset::Supply" - | "0x1::fungible_asset::Metadata" - | "0x1::fungible_asset::FungibleStore" - ) + [ + format!("{}::fungible_asset::Supply", COIN_ADDR), + format!("{}::fungible_asset::Metadata", COIN_ADDR), + format!("{}::fungible_asset::FungibleStore", COIN_ADDR), + ] + .contains(&data_type.to_string()) } pub fn from_resource( @@ -155,12 +198,18 @@ impl V2FungibleAssetResource { txn_version: i64, ) -> Result { match data_type { - "0x1::fungible_asset::Supply" => serde_json::from_value(data.clone()) - .map(|inner| Some(Self::FungibleAssetSupply(inner))), - "0x1::fungible_asset::Metadata" => serde_json::from_value(data.clone()) - .map(|inner| Some(Self::FungibleAssetMetadata(inner))), - "0x1::fungible_asset::FungibleStore" => serde_json::from_value(data.clone()) - .map(|inner| Some(Self::FungibleAssetStore(inner))), + x if x == format!("{}::fungible_asset::Supply", COIN_ADDR) => { + serde_json::from_value(data.clone()) + .map(|inner| Some(Self::FungibleAssetSupply(inner))) + }, + x if x == format!("{}::fungible_asset::Metadata", COIN_ADDR) => { + serde_json::from_value(data.clone()) + .map(|inner| Some(Self::FungibleAssetMetadata(inner))) + }, + x if x == format!("{}::fungible_asset::FungibleStore", COIN_ADDR) => { + serde_json::from_value(data.clone()) + .map(|inner| Some(Self::FungibleAssetStore(inner))) + }, _ => Ok(None), } .context(format!( @@ -174,6 +223,29 @@ impl V2FungibleAssetResource { } } +pub enum FungibleAssetEvent { + DepositEvent(DepositEvent), + WithdrawEvent(WithdrawEvent), +} + +impl FungibleAssetEvent { + pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result> { + match data_type { + "0x1::fungible_asset::DepositEvent" => { + serde_json::from_str(data).map(|inner| Some(Self::DepositEvent(inner))) + }, + "0x1::fungible_asset::WithdrawEvent" => { + serde_json::from_str(data).map(|inner| Some(Self::WithdrawEvent(inner))) + }, + _ => Ok(None), + } + .context(format!( + "version {} failed! failed to parse type {}, data {:?}", + txn_version, data_type, data + )) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_collections.rs b/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_collections.rs index fb7781e07d06e..60696ae255432 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_collections.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_collections.rs @@ -6,6 +6,7 @@ #![allow(clippy::unused_unit)] use super::{ + collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, token_utils::{CollectionDataIdType, TokenWriteSet}, tokens::TableHandleToOwner, v2_token_utils::{TokenStandard, TokenV2AggregatedDataMapping, V2TokenResource}, @@ -25,9 +26,6 @@ use serde::{Deserialize, Serialize}; // PK of current_collections_v2, i.e. collection_id pub type CurrentCollectionV2PK = String; -const QUERY_RETRIES: u32 = 5; -const QUERY_RETRY_DELAY_MS: u64 = 500; - #[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, write_set_change_index))] #[diesel(table_name = collections_v2)] @@ -260,7 +258,7 @@ impl CollectionV2 { /// If collection data is not in resources of the same transaction, then try looking for it in the database. Since collection owner /// cannot change, we can just look in the current_collection_datas table. /// Retrying a few times since this collection could've been written in a separate thread. - pub fn get_collection_creator_for_v1( + fn get_collection_creator_for_v1( conn: &mut PgPoolConnection, table_handle: &str, ) -> anyhow::Result { @@ -278,7 +276,7 @@ impl CollectionV2 { } /// TODO: Change this to a KV store - pub fn get_by_table_handle( + fn get_by_table_handle( conn: &mut PgPoolConnection, table_handle: &str, ) -> anyhow::Result { diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_activities.rs b/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_activities.rs index c4fa8ac65cd03..8fb427744254e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_activities.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_activities.rs @@ -7,9 +7,14 @@ use super::{ token_utils::{TokenDataIdType, TokenEvent}, + v2_token_datas::TokenDataV2, v2_token_utils::{TokenStandard, TokenV2AggregatedDataMapping, V2TokenEvent}, }; -use crate::{schema::token_activities_v2, utils::util::standardize_address}; +use crate::{ + models::coin_models::v2_fungible_asset_utils::FungibleAssetEvent, + schema::token_activities_v2, + utils::{database::PgPoolConnection, util::standardize_address}, +}; use aptos_protos::transaction::v1::Event; use bigdecimal::{BigDecimal, One, Zero}; use field_count::FieldCount; @@ -55,7 +60,85 @@ struct TokenActivityHelperV2 { } impl TokenActivityV2 { - pub fn get_v2_nft_from_parsed_event( + /// We'll go from 0x1::fungible_asset::withdraw/deposit events. + /// We're guaranteed to find a 0x1::fungible_asset::FungibleStore which has a pointer to the + /// fungible asset metadata which could be a token. We'll either find that token in token_v2_metadata + /// or by looking up the postgres table. + /// TODO: Create artificial events for mint and burn. There are no mint and burn events so we'll have to + /// add all the deposits/withdrawals and if it's positive/negative it's a mint/burn. + pub fn get_ft_v2_from_parsed_event( + event: &Event, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + event_index: i64, + entry_function_id_str: &Option, + token_v2_metadata: &TokenV2AggregatedDataMapping, + conn: &mut PgPoolConnection, + ) -> anyhow::Result> { + let event_type = event.type_str.clone(); + if let Some(fa_event) = + &FungibleAssetEvent::from_event(event_type.as_str(), &event.data, txn_version)? + { + let event_account_address = + standardize_address(&event.key.as_ref().unwrap().account_address); + + // The event account address will also help us find fungible store which tells us where to find + // the metadata + if let Some(metadata) = token_v2_metadata.get(&event_account_address) { + let object_core = &metadata.object.object_core; + let fungible_asset = metadata.fungible_asset_store.as_ref().unwrap(); + let maybe_token_data_id = fungible_asset.metadata.get_reference_address(); + // Now we try to see if the fungible asset is actually a token. If it's not token, return early + let is_token = if token_v2_metadata.get(&maybe_token_data_id).is_some() { + true + } else { + // Look up in the db + TokenDataV2::is_address_token(conn, &maybe_token_data_id)? + }; + if !is_token { + return Ok(None); + } + + let token_activity_helper = match fa_event { + FungibleAssetEvent::WithdrawEvent(inner) => TokenActivityHelperV2 { + from_address: Some(object_core.get_owner_address()), + to_address: None, + token_amount: inner.amount.clone(), + before_value: None, + after_value: None, + }, + FungibleAssetEvent::DepositEvent(inner) => TokenActivityHelperV2 { + from_address: None, + to_address: Some(object_core.get_owner_address()), + token_amount: inner.amount.clone(), + before_value: None, + after_value: None, + }, + }; + + return Ok(Some(Self { + transaction_version: txn_version, + event_index, + event_account_address, + token_data_id: maybe_token_data_id.clone(), + property_version_v1: BigDecimal::zero(), + type_: event_type.to_string(), + from_address: token_activity_helper.from_address, + to_address: token_activity_helper.to_address, + token_amount: token_activity_helper.token_amount, + before_value: token_activity_helper.before_value, + after_value: token_activity_helper.after_value, + entry_function_id_str: entry_function_id_str.clone(), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: Some(true), + transaction_timestamp: txn_timestamp, + })); + } + } + Ok(None) + } + + pub fn get_nft_v2_from_parsed_event( event: &Event, txn_version: i64, txn_timestamp: chrono::NaiveDateTime, diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_datas.rs b/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_datas.rs index 1bac819694579..c7eca35c1b09d 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_datas.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_datas.rs @@ -6,15 +6,18 @@ #![allow(clippy::unused_unit)] use super::{ + collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, token_utils::TokenWriteSet, v2_token_utils::{TokenStandard, TokenV2, TokenV2AggregatedDataMapping}, }; use crate::{ schema::{current_token_datas_v2, token_datas_v2}, - utils::util::standardize_address, + utils::{database::PgPoolConnection, util::standardize_address}, }; +use anyhow::Context; use aptos_protos::transaction::v1::{WriteResource, WriteTableItem}; use bigdecimal::{BigDecimal, Zero}; +use diesel::{prelude::*, sql_query, sql_types::Text}; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -62,6 +65,12 @@ pub struct CurrentTokenDataV2 { pub decimals: i64, } +#[derive(Debug, QueryableByName)] +pub struct TokenDataIdFromTable { + #[diesel(sql_type = Text)] + pub token_data_id: String, +} + impl TokenDataV2 { pub fn get_v2_from_write_resource( write_resource: &WriteResource, @@ -222,4 +231,34 @@ impl TokenDataV2 { } Ok(None) } + + /// Try to see if an address is a token. We'll try a few times in case there is a race condition, + /// and if we can't find after 3 times, we'll assume that it's not a token. + /// TODO: An improvement is that we'll make another query to see if address is a coin. + pub fn is_address_token(conn: &mut PgPoolConnection, address: &str) -> anyhow::Result { + let mut retried = 0; + while retried < QUERY_RETRIES { + retried += 1; + match Self::get_by_token_data_id(conn, address) { + Ok(_) => return Ok(true), + Err(_) => { + std::thread::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)); + }, + } + } + Ok(false) + } + + /// TODO: Change this to a KV store + fn get_by_token_data_id(conn: &mut PgPoolConnection, address: &str) -> anyhow::Result { + let mut res: Vec> = + sql_query("SELECT token_data_id FROM current_token_datas_v2 WHERE token_data_id = $1") + .bind::(address) + .get_results(conn)?; + Ok(res + .pop() + .context("token data result empty")? + .context("token data result null")? + .token_data_id) + } } diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_utils.rs b/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_utils.rs index e81d8fc999427..f0aae5198b349 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_utils.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/src/models/token_models/v2_token_utils.rs @@ -9,7 +9,9 @@ use crate::{ models::{ coin_models::{ coin_utils::COIN_ADDR, - v2_fungible_asset_utils::{FungibleAssetMetadata, FungibleAssetSupply}, + v2_fungible_asset_utils::{ + FungibleAssetMetadata, FungibleAssetStore, FungibleAssetSupply, + }, }, default_models::{move_resources::MoveResource, v2_objects::CurrentObjectPK}, }, @@ -37,12 +39,14 @@ pub type TokenV2Burned = HashSet; /// Index of the event so that we can write its inverse to the db as primary key (to avoid collisiona) pub type EventIndex = i64; +/// This contains both metadata for fungible assets and fungible tokens #[derive(Serialize, Deserialize, Debug, Clone)] pub struct TokenV2AggregatedData { pub aptos_collection: Option, pub fixed_supply: Option, pub fungible_asset_metadata: Option, pub fungible_asset_supply: Option, + pub fungible_asset_store: Option, pub object: ObjectWithMetadata, pub property_map: Option, pub token: Option, diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/src/processors/token_processor.rs b/ecosystem/indexer-grpc/indexer-grpc-parser/src/processors/token_processor.rs index 9067bf0935aa7..672aa4d038463 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-parser/src/processors/token_processor.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/src/processors/token_processor.rs @@ -4,7 +4,9 @@ use super::processor_trait::{ProcessingResult, ProcessorTrait}; use crate::{ models::{ - coin_models::v2_fungible_asset_utils::{FungibleAssetMetadata, FungibleAssetSupply}, + coin_models::v2_fungible_asset_utils::{ + FungibleAssetMetadata, FungibleAssetStore, FungibleAssetSupply, + }, token_models::{ ans_lookup::{CurrentAnsLookup, CurrentAnsLookupPK}, collection_datas::{CollectionData, CurrentCollectionData}, @@ -1104,6 +1106,7 @@ fn parse_v2_token( token: None, fungible_asset_metadata: None, fungible_asset_supply: None, + fungible_asset_store: None, }, ); } @@ -1149,6 +1152,11 @@ fn parse_v2_token( { aggregated_data.fungible_asset_supply = Some(fungible_asset_supply); } + if let Some(fungible_asset_store) = + FungibleAssetStore::from_write_resource(wr, txn_version).unwrap() + { + aggregated_data.fungible_asset_store = Some(fungible_asset_store); + } } } } @@ -1187,13 +1195,27 @@ fn parse_v2_token( token_activities_v2.push(event); } // handling all the token v2 events - if let Some(event) = TokenActivityV2::get_v2_nft_from_parsed_event( + if let Some(event) = TokenActivityV2::get_nft_v2_from_parsed_event( + event, + txn_version, + txn_timestamp, + index as i64, + &entry_function_id_str, + &token_v2_metadata_helper, + ) + .unwrap() + { + token_activities_v2.push(event); + } + // handling all the token v2 events + if let Some(event) = TokenActivityV2::get_ft_v2_from_parsed_event( event, txn_version, txn_timestamp, index as i64, &entry_function_id_str, &token_v2_metadata_helper, + conn, ) .unwrap() { diff --git a/ecosystem/indexer-grpc/indexer-grpc-parser/src/worker.rs b/ecosystem/indexer-grpc/indexer-grpc-parser/src/worker.rs index 8d632f5710fc1..41e69baa0a246 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-parser/src/worker.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-parser/src/worker.rs @@ -40,7 +40,7 @@ pub struct Worker { pub db_pool: PgDbPool, pub processor_name: String, pub postgres_connection_string: String, - pub indexer_grpc_data_service_addresss: String, + pub indexer_grpc_data_service_address: String, pub indexer_grpc_http2_ping_interval: std::time::Duration, pub indexer_grpc_http2_ping_timeout: std::time::Duration, pub auth_token: String, @@ -55,7 +55,7 @@ impl Worker { pub async fn new( processor_name: String, postgres_connection_string: String, - indexer_grpc_data_service_addresss: String, + indexer_grpc_data_service_address: String, indexer_grpc_http2_ping_interval: std::time::Duration, indexer_grpc_http2_ping_timeout: std::time::Duration, auth_token: String, @@ -82,7 +82,7 @@ impl Worker { db_pool: conn_pool, processor_name, postgres_connection_string, - indexer_grpc_data_service_addresss, + indexer_grpc_data_service_address, indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, starting_version, @@ -99,13 +99,13 @@ impl Worker { info!( processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_addresss.clone(), + stream_address = self.indexer_grpc_data_service_address.clone(), "[Parser] Connecting to GRPC endpoint", ); let channel = tonic::transport::Channel::from_shared(format!( "http://{}", - self.indexer_grpc_data_service_addresss.clone() + self.indexer_grpc_data_service_address.clone() )) .expect("[Parser] Endpoint is not a valid URI") .http2_keep_alive_interval(self.indexer_grpc_http2_ping_interval) @@ -116,7 +116,7 @@ impl Worker { Err(e) => { error!( processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_addresss.clone(), + stream_address = self.indexer_grpc_data_service_address.clone(), error = ?e, "[Parser] Error connecting to grpc_stream" ); @@ -125,7 +125,7 @@ impl Worker { }; info!( processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_addresss.clone(), + stream_address = self.indexer_grpc_data_service_address.clone(), "[Parser] Connected to GRPC endpoint", ); @@ -157,7 +157,7 @@ impl Worker { info!( processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_addresss.clone(), + stream_address = self.indexer_grpc_data_service_address.clone(), final_start_version = starting_version, start_version_from_config = self.starting_version, start_version_from_db = starting_version_from_db, @@ -181,7 +181,7 @@ impl Worker { let concurrent_tasks = self.number_concurrent_processing_tasks; info!( processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_addresss.clone(), + stream_address = self.indexer_grpc_data_service_address.clone(), starting_version = starting_version, concurrent_tasks = concurrent_tasks, "[Parser] Successfully connected to GRPC endpoint. Now instantiating processor", @@ -324,7 +324,7 @@ impl Worker { Err(e) => { error!( processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_addresss.clone(), + stream_address = self.indexer_grpc_data_service_address.clone(), error = ?e, "[Parser] Error processing transactions" ); @@ -350,7 +350,7 @@ impl Worker { if prev_end.unwrap() + 1 != start { error!( processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_addresss.clone(), + stream_address = self.indexer_grpc_data_service_address.clone(), processed_versions = processed_versions_sorted .iter() .map(|(s, e)| format!("{}-{}", s, e)) diff --git a/storage/indexer/src/lib.rs b/storage/indexer/src/lib.rs index 9dd9a32954456..5260177da34cd 100644 --- a/storage/indexer/src/lib.rs +++ b/storage/indexer/src/lib.rs @@ -118,7 +118,19 @@ impl Indexer { } let mut batch = SchemaBatch::new(); - table_info_parser.finish(&mut batch)?; + match table_info_parser.finish(&mut batch) { + Ok(_) => {}, + Err(err) => { + aptos_logger::error!(first_version = first_version, end_version = end_version, error = ?&err); + write_sets + .iter() + .enumerate() + .for_each(|(i, write_set)| { + aptos_logger::error!(version = first_version as usize + i, write_set = ?write_set); + }); + bail!(err); + }, + }; batch.put::( &MetadataKey::LatestVersion, &MetadataValue::Version(end_version - 1),