Skip to content

Commit

Permalink
add indexer grpc, add better logs for storage indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenyang007 committed Jul 14, 2023
1 parent 5419a7c commit 9157f6f
Show file tree
Hide file tree
Showing 16 changed files with 306 additions and 43 deletions.
2 changes: 1 addition & 1 deletion crates/indexer/src/models/token_models/v2_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl CollectionV2 {
}

/// TODO: Change this to a KV store
pub fn get_by_table_handle(
fn get_by_table_handle(
conn: &mut PgPoolConnection,
table_handle: &str,
) -> anyhow::Result<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl TokenActivityV2 {
let fungible_asset = metadata.fungible_asset_store.as_ref().unwrap();
let maybe_token_data_id = fungible_asset.metadata.get_reference_address();
// Now we try to see if the fungible asset is actually a token. If it's not token, return early
let is_token = if let Some(_) = token_v2_metadata.get(&maybe_token_data_id) {
let is_token = if token_v2_metadata.get(&maybe_token_data_id).is_some() {
true
} else {
// Look up in the db
Expand Down
4 changes: 3 additions & 1 deletion crates/indexer/src/models/token_models/v2_token_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
use super::token_utils::{NAME_LENGTH, URI_LENGTH};
use crate::{
models::{
coin_models::v2_fungible_asset_utils::{FungibleAssetMetadata, FungibleAssetSupply, FungibleAssetStore},
coin_models::v2_fungible_asset_utils::{
FungibleAssetMetadata, FungibleAssetStore, FungibleAssetSupply,
},
move_resources::MoveResource,
v2_objects::CurrentObjectPK,
},
Expand Down
4 changes: 2 additions & 2 deletions ecosystem/indexer-grpc/indexer-grpc-parser/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc
server_config:
processor_name: default_processor
postgres_connection_string: postgresql://postgres:@localhost:5432/postgres_v2
indexer_grpc_data_service_addresss: 127.0.0.1:50051
indexer_grpc_data_service_address: 127.0.0.1:50051
indexer_grpc_http2_ping_interval_in_secs: 60
indexer_grpc_http2_ping_timeout_in_secs: 10
auth_token: AUTH_TOKEN
Expand All @@ -26,7 +26,7 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc
* `processor_name`: purpose of this processor; also used for monitoring purpose.
* `postgres_connection_string`: PostgresQL DB connection string
* `indexer_grpc_data_service_addresss`: Data service non-TLS endpoint address.
* `indexer_grpc_data_service_address`: Data service non-TLS endpoint address.
* `indexer_grpc_http2_ping_interval_in_secs`: client-side grpc HTTP2 ping interval.
* `indexer_grpc_http2_ping_timeout_in_secs`: client-side grpc HTTP2 ping timeout.
* `auth_token`: Auth token used for connection.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP INDEX IF EXISTS mr_ver_index;
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- Your SQL goes here
-- This is needed to improve performance when querying an account with a large number of transactions
CREATE INDEX IF NOT EXISTS mr_ver_index ON move_resources(transaction_version DESC);
-- These are needed b/c for some reason we're getting build errors when setting
-- type field with a length limit
ALTER TABLE signatures
ALTER COLUMN type TYPE VARCHAR;
ALTER TABLE token_activities_v2
ALTER COLUMN type TYPE VARCHAR;
DROP VIEW IF EXISTS transactions_view;
ALTER TABLE transactions
ALTER COLUMN type TYPE VARCHAR;
CREATE VIEW transactions_view AS
SELECT "version",
block_height,
"hash",
"type",
payload#>>'{}' AS json_payload,
state_change_hash,
event_root_hash,
state_checkpoint_hash,
gas_used,
success,
vm_status,
accumulator_root_hash,
num_events,
num_write_set_changes,
inserted_at
FROM transactions;
2 changes: 1 addition & 1 deletion ecosystem/indexer-grpc/indexer-grpc-parser/parser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ health_check_port: 8084
server_config:
processor_name: default_processor
postgres_connection_string: postgresql://postgres:@localhost:5432/postgres_v2
indexer_grpc_data_service_addresss: 127.0.0.1:50051:50051
indexer_grpc_data_service_address: 127.0.0.1:50051:50051
auth_token: AUTH_TOKEN
4 changes: 2 additions & 2 deletions ecosystem/indexer-grpc/indexer-grpc-parser/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct IndexerGrpcProcessorConfig {
pub processor_name: String,
pub postgres_connection_string: String,
// TODO: add tls support.
pub indexer_grpc_data_service_addresss: String,
pub indexer_grpc_data_service_address: String,
// Indexer GRPC http2 ping interval in seconds; default to 30.
// tonic ref: https://docs.rs/tonic/latest/tonic/transport/channel/struct.Endpoint.html#method.http2_keep_alive_interval
pub indexer_grpc_http2_ping_interval_in_secs: Option<u64>,
Expand All @@ -33,7 +33,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
let mut worker = Worker::new(
self.processor_name.clone(),
self.postgres_connection_string.clone(),
self.indexer_grpc_data_service_addresss.clone(),
self.indexer_grpc_data_service_address.clone(),
std::time::Duration::from_secs(
self.indexer_grpc_http2_ping_interval_in_secs.unwrap_or(30),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]

use super::coin_utils::COIN_ADDR;
use crate::{
models::{
default_models::move_resources::MoveResource,
Expand Down Expand Up @@ -83,6 +84,36 @@ pub struct FungibleAssetStore {
pub frozen: bool,
}

impl FungibleAssetStore {
pub fn from_write_resource(
write_resource: &WriteResource,
txn_version: i64,
) -> anyhow::Result<Option<Self>> {
let type_str = MoveResource::get_outer_type_from_resource(write_resource);
if !V2FungibleAssetResource::is_resource_supported(type_str.as_str()) {
return Ok(None);
}
let resource = MoveResource::from_write_resource(
write_resource,
0, // Placeholder, this isn't used anyway
txn_version,
0, // Placeholder, this isn't used anyway
);

if let V2FungibleAssetResource::FungibleAssetStore(inner) =
V2FungibleAssetResource::from_resource(
&type_str,
resource.data.as_ref().unwrap(),
txn_version,
)?
{
Ok(Some(inner))
} else {
Ok(None)
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FungibleAssetSupply {
#[serde(deserialize_with = "deserialize_from_string")]
Expand Down Expand Up @@ -132,6 +163,18 @@ impl FungibleAssetSupply {
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DepositEvent {
#[serde(deserialize_with = "deserialize_from_string")]
pub amount: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WithdrawEvent {
#[serde(deserialize_with = "deserialize_from_string")]
pub amount: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum V2FungibleAssetResource {
FungibleAssetMetadata(FungibleAssetMetadata),
Expand All @@ -141,12 +184,12 @@ pub enum V2FungibleAssetResource {

impl V2FungibleAssetResource {
pub fn is_resource_supported(data_type: &str) -> bool {
matches!(
data_type,
"0x1::fungible_asset::Supply"
| "0x1::fungible_asset::Metadata"
| "0x1::fungible_asset::FungibleStore"
)
[
format!("{}::fungible_asset::Supply", COIN_ADDR),
format!("{}::fungible_asset::Metadata", COIN_ADDR),
format!("{}::fungible_asset::FungibleStore", COIN_ADDR),
]
.contains(&data_type.to_string())
}

pub fn from_resource(
Expand All @@ -155,12 +198,18 @@ impl V2FungibleAssetResource {
txn_version: i64,
) -> Result<Self> {
match data_type {
"0x1::fungible_asset::Supply" => serde_json::from_value(data.clone())
.map(|inner| Some(Self::FungibleAssetSupply(inner))),
"0x1::fungible_asset::Metadata" => serde_json::from_value(data.clone())
.map(|inner| Some(Self::FungibleAssetMetadata(inner))),
"0x1::fungible_asset::FungibleStore" => serde_json::from_value(data.clone())
.map(|inner| Some(Self::FungibleAssetStore(inner))),
x if x == format!("{}::fungible_asset::Supply", COIN_ADDR) => {
serde_json::from_value(data.clone())
.map(|inner| Some(Self::FungibleAssetSupply(inner)))
},
x if x == format!("{}::fungible_asset::Metadata", COIN_ADDR) => {
serde_json::from_value(data.clone())
.map(|inner| Some(Self::FungibleAssetMetadata(inner)))
},
x if x == format!("{}::fungible_asset::FungibleStore", COIN_ADDR) => {
serde_json::from_value(data.clone())
.map(|inner| Some(Self::FungibleAssetStore(inner)))
},
_ => Ok(None),
}
.context(format!(
Expand All @@ -174,6 +223,29 @@ impl V2FungibleAssetResource {
}
}

pub enum FungibleAssetEvent {
DepositEvent(DepositEvent),
WithdrawEvent(WithdrawEvent),
}

impl FungibleAssetEvent {
pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result<Option<Self>> {
match data_type {
"0x1::fungible_asset::DepositEvent" => {
serde_json::from_str(data).map(|inner| Some(Self::DepositEvent(inner)))
},
"0x1::fungible_asset::WithdrawEvent" => {
serde_json::from_str(data).map(|inner| Some(Self::WithdrawEvent(inner)))
},
_ => Ok(None),
}
.context(format!(
"version {} failed! failed to parse type {}, data {:?}",
txn_version, data_type, data
))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![allow(clippy::unused_unit)]

use super::{
collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS},
token_utils::{CollectionDataIdType, TokenWriteSet},
tokens::TableHandleToOwner,
v2_token_utils::{TokenStandard, TokenV2AggregatedDataMapping, V2TokenResource},
Expand All @@ -25,9 +26,6 @@ use serde::{Deserialize, Serialize};
// PK of current_collections_v2, i.e. collection_id
pub type CurrentCollectionV2PK = String;

const QUERY_RETRIES: u32 = 5;
const QUERY_RETRY_DELAY_MS: u64 = 500;

#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, write_set_change_index))]
#[diesel(table_name = collections_v2)]
Expand Down Expand Up @@ -260,7 +258,7 @@ impl CollectionV2 {
/// If collection data is not in resources of the same transaction, then try looking for it in the database. Since collection owner
/// cannot change, we can just look in the current_collection_datas table.
/// Retrying a few times since this collection could've been written in a separate thread.
pub fn get_collection_creator_for_v1(
fn get_collection_creator_for_v1(
conn: &mut PgPoolConnection,
table_handle: &str,
) -> anyhow::Result<String> {
Expand All @@ -278,7 +276,7 @@ impl CollectionV2 {
}

/// TODO: Change this to a KV store
pub fn get_by_table_handle(
fn get_by_table_handle(
conn: &mut PgPoolConnection,
table_handle: &str,
) -> anyhow::Result<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@

use super::{
token_utils::{TokenDataIdType, TokenEvent},
v2_token_datas::TokenDataV2,
v2_token_utils::{TokenStandard, TokenV2AggregatedDataMapping, V2TokenEvent},
};
use crate::{schema::token_activities_v2, utils::util::standardize_address};
use crate::{
models::coin_models::v2_fungible_asset_utils::FungibleAssetEvent,
schema::token_activities_v2,
utils::{database::PgPoolConnection, util::standardize_address},
};
use aptos_protos::transaction::v1::Event;
use bigdecimal::{BigDecimal, One, Zero};
use field_count::FieldCount;
Expand Down Expand Up @@ -55,7 +60,85 @@ struct TokenActivityHelperV2 {
}

impl TokenActivityV2 {
pub fn get_v2_nft_from_parsed_event(
/// We'll go from 0x1::fungible_asset::withdraw/deposit events.
/// We're guaranteed to find a 0x1::fungible_asset::FungibleStore which has a pointer to the
/// fungible asset metadata which could be a token. We'll either find that token in token_v2_metadata
/// or by looking up the postgres table.
/// TODO: Create artificial events for mint and burn. There are no mint and burn events so we'll have to
/// add all the deposits/withdrawals and if it's positive/negative it's a mint/burn.
pub fn get_ft_v2_from_parsed_event(
event: &Event,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
event_index: i64,
entry_function_id_str: &Option<String>,
token_v2_metadata: &TokenV2AggregatedDataMapping,
conn: &mut PgPoolConnection,
) -> anyhow::Result<Option<Self>> {
let event_type = event.type_str.clone();
if let Some(fa_event) =
&FungibleAssetEvent::from_event(event_type.as_str(), &event.data, txn_version)?
{
let event_account_address =
standardize_address(&event.key.as_ref().unwrap().account_address);

// The event account address will also help us find fungible store which tells us where to find
// the metadata
if let Some(metadata) = token_v2_metadata.get(&event_account_address) {
let object_core = &metadata.object.object_core;
let fungible_asset = metadata.fungible_asset_store.as_ref().unwrap();
let maybe_token_data_id = fungible_asset.metadata.get_reference_address();
// Now we try to see if the fungible asset is actually a token. If it's not token, return early
let is_token = if token_v2_metadata.get(&maybe_token_data_id).is_some() {
true
} else {
// Look up in the db
TokenDataV2::is_address_token(conn, &maybe_token_data_id)?
};
if !is_token {
return Ok(None);
}

let token_activity_helper = match fa_event {
FungibleAssetEvent::WithdrawEvent(inner) => TokenActivityHelperV2 {
from_address: Some(object_core.get_owner_address()),
to_address: None,
token_amount: inner.amount.clone(),
before_value: None,
after_value: None,
},
FungibleAssetEvent::DepositEvent(inner) => TokenActivityHelperV2 {
from_address: None,
to_address: Some(object_core.get_owner_address()),
token_amount: inner.amount.clone(),
before_value: None,
after_value: None,
},
};

return Ok(Some(Self {
transaction_version: txn_version,
event_index,
event_account_address,
token_data_id: maybe_token_data_id.clone(),
property_version_v1: BigDecimal::zero(),
type_: event_type.to_string(),
from_address: token_activity_helper.from_address,
to_address: token_activity_helper.to_address,
token_amount: token_activity_helper.token_amount,
before_value: token_activity_helper.before_value,
after_value: token_activity_helper.after_value,
entry_function_id_str: entry_function_id_str.clone(),
token_standard: TokenStandard::V2.to_string(),
is_fungible_v2: Some(true),
transaction_timestamp: txn_timestamp,
}));
}
}
Ok(None)
}

pub fn get_nft_v2_from_parsed_event(
event: &Event,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
Expand Down
Loading

0 comments on commit 9157f6f

Please sign in to comment.