Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

automerge #9

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`
ALTER TABLE fungible_asset_metadata
DROP COLUMN supply_v2,
DROP COLUMN maximum_v2;
12 changes: 12 additions & 0 deletions rust/processor/migrations/2024-04-18-173631_fungible_token/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Your SQL goes here
ALTER TABLE fungible_asset_metadata
ADD COLUMN supply_v2 NUMERIC,
ADD COLUMN maximum_v2 NUMERIC;

ALTER TABLE current_token_datas_v2
ALTER COLUMN supply DROP NOT NULL,
ALTER COLUMN decimals DROP NOT NULL;

ALTER TABLE token_datas_v2
ALTER COLUMN supply DROP NOT NULL,
ALTER COLUMN decimals DROP NOT NULL;
8 changes: 8 additions & 0 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ pub struct IndexerGrpcHttp2Config {

/// Indexer GRPC http2 ping timeout in seconds. Defaults to 10.
indexer_grpc_http2_ping_timeout_in_secs: u64,

/// Seconds before timeout for grpc connection.
indexer_grpc_connection_timeout_secs: u64,
}

impl IndexerGrpcHttp2Config {
Expand All @@ -124,13 +127,18 @@ impl IndexerGrpcHttp2Config {
pub fn grpc_http2_ping_timeout_in_secs(&self) -> Duration {
Duration::from_secs(self.indexer_grpc_http2_ping_timeout_in_secs)
}

pub fn grpc_connection_timeout_secs(&self) -> Duration {
Duration::from_secs(self.indexer_grpc_connection_timeout_secs)
}
}

impl Default for IndexerGrpcHttp2Config {
fn default() -> Self {
Self {
indexer_grpc_http2_ping_interval_in_secs: 30,
indexer_grpc_http2_ping_timeout_in_secs: 10,
indexer_grpc_connection_timeout_secs: 5,
}
}
}
65 changes: 59 additions & 6 deletions rust/processor/src/grpc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub async fn get_stream(
indexer_grpc_data_service_address: Url,
indexer_grpc_http2_ping_interval: Duration,
indexer_grpc_http2_ping_timeout: Duration,
indexer_grpc_reconnection_timeout_secs: Duration,
starting_version: u64,
ending_version: Option<u64>,
auth_token: String,
Expand Down Expand Up @@ -121,7 +122,7 @@ pub async fn get_stream(
let mut connect_retries = 0;
let connect_res = loop {
let res = timeout(
Duration::from_secs(5),
indexer_grpc_reconnection_timeout_secs,
RawDataClient::connect(channel.clone()),
)
.await;
Expand Down Expand Up @@ -177,17 +178,65 @@ pub async fn get_stream(
num_of_transactions = ?count,
"[Parser] Setting up GRPC stream",
);
let request = grpc_request_builder(starting_version, count, auth_token, processor_name);
rpc_client
.get_transactions(request)
.await
.expect("[Parser] Failed to get grpc response. Is the server running?")

// TODO: move this to a config file
// Retry this connection a few times before giving up
let mut connect_retries = 0;
let stream_res = loop {
let timeout_res = timeout(indexer_grpc_reconnection_timeout_secs, async {
let request = grpc_request_builder(
starting_version,
count,
auth_token.clone(),
processor_name.clone(),
);
rpc_client.get_transactions(request).await
})
.await;
match timeout_res {
Ok(client) => break Ok(client),
Err(e) => {
error!(
processor_name = processor_name,
service_type = crate::worker::PROCESSOR_SERVICE_TYPE,
stream_address = indexer_grpc_data_service_address.to_string(),
start_version = starting_version,
end_version = ending_version,
retries = connect_retries,
error = ?e,
"[Parser] Timeout making grpc request. Retrying...",
);
connect_retries += 1;
if connect_retries >= RECONNECTION_MAX_RETRIES {
break Err(e);
}
},
}
}
.expect("[Parser] Timed out making grpc request after max retries.");

match stream_res {
Ok(stream) => stream,
Err(e) => {
error!(
processor_name = processor_name,
service_type = crate::worker::PROCESSOR_SERVICE_TYPE,
stream_address = indexer_grpc_data_service_address.to_string(),
start_version = starting_version,
ending_version = ending_version,
error = ?e,
"[Parser] Failed to get grpc response. Is the server running?"
);
panic!("[Parser] Failed to get grpc response. Is the server running?");
},
}
}

pub async fn get_chain_id(
indexer_grpc_data_service_address: Url,
indexer_grpc_http2_ping_interval: Duration,
indexer_grpc_http2_ping_timeout: Duration,
indexer_grpc_reconnection_timeout_secs: Duration,
auth_token: String,
processor_name: String,
) -> u64 {
Expand All @@ -201,6 +250,7 @@ pub async fn get_chain_id(
indexer_grpc_data_service_address.clone(),
indexer_grpc_http2_ping_interval,
indexer_grpc_http2_ping_timeout,
indexer_grpc_reconnection_timeout_secs,
1,
Some(2),
auth_token.clone(),
Expand Down Expand Up @@ -257,6 +307,7 @@ pub async fn create_fetcher_loop(
indexer_grpc_data_service_address: Url,
indexer_grpc_http2_ping_interval: Duration,
indexer_grpc_http2_ping_timeout: Duration,
indexer_grpc_reconnection_timeout_secs: Duration,
starting_version: u64,
request_ending_version: Option<u64>,
auth_token: String,
Expand All @@ -277,6 +328,7 @@ pub async fn create_fetcher_loop(
indexer_grpc_data_service_address.clone(),
indexer_grpc_http2_ping_interval,
indexer_grpc_http2_ping_timeout,
indexer_grpc_reconnection_timeout_secs,
starting_version,
request_ending_version,
auth_token.clone(),
Expand Down Expand Up @@ -587,6 +639,7 @@ pub async fn create_fetcher_loop(
indexer_grpc_data_service_address.clone(),
indexer_grpc_http2_ping_interval,
indexer_grpc_http2_ping_timeout,
indexer_grpc_reconnection_timeout_secs,
next_version_to_fetch,
request_ending_version,
auth_token.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use super::{
v2_fungible_asset_utils::{FeeStatement, FungibleAssetEvent},
v2_fungible_metadata::FungibleAssetMetadataModel,
};
use super::v2_fungible_asset_utils::{FeeStatement, FungibleAssetEvent};
use crate::{
models::{
coin_models::{
Expand All @@ -19,7 +16,7 @@ use crate::{
token_v2_models::v2_token_utils::TokenStandard,
},
schema::fungible_asset_activities,
utils::{database::PgPoolConnection, util::standardize_address},
utils::util::standardize_address,
};
use ahash::AHashMap;
use anyhow::Context;
Expand Down Expand Up @@ -70,7 +67,6 @@ impl FungibleAssetActivity {
event_index: i64,
entry_function_id_str: &Option<String>,
object_aggregated_data_mapping: &ObjectAggregatedDataMapping,
conn: &mut PgPoolConnection<'_>,
) -> anyhow::Result<Option<Self>> {
let event_type = event.type_str.clone();
if let Some(fa_event) =
Expand All @@ -84,17 +80,6 @@ impl FungibleAssetActivity {
let object_core = &object_metadata.object.object_core;
let fungible_asset = object_metadata.fungible_asset_store.as_ref().unwrap();
let asset_type = fungible_asset.metadata.get_reference_address();
// If it's a fungible token, return early
if !FungibleAssetMetadataModel::is_address_fungible_asset(
conn,
&asset_type,
object_aggregated_data_mapping,
txn_version,
)
.await
{
return Ok(None);
}

let (is_frozen, amount) = match fa_event {
FungibleAssetEvent::WithdrawEvent(inner) => (None, Some(inner.amount.clone())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

use super::{
v2_fungible_asset_activities::EventToCoinType, v2_fungible_asset_utils::FungibleAssetStore,
v2_fungible_metadata::FungibleAssetMetadataModel,
};
use crate::{
models::{
Expand All @@ -16,7 +15,7 @@ use crate::{
token_v2_models::v2_token_utils::TokenStandard,
},
schema::{current_fungible_asset_balances, fungible_asset_balances},
utils::{database::PgPoolConnection, util::standardize_address},
utils::util::standardize_address,
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::WriteResource;
Expand Down Expand Up @@ -71,7 +70,6 @@ impl FungibleAssetBalance {
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
object_metadatas: &ObjectAggregatedDataMapping,
conn: &mut PgPoolConnection<'_>,
) -> anyhow::Result<Option<(Self, CurrentFungibleAssetBalance)>> {
if let Some(inner) = &FungibleAssetStore::from_write_resource(write_resource, txn_version)?
{
Expand All @@ -81,17 +79,6 @@ impl FungibleAssetBalance {
let object = &object_data.object.object_core;
let owner_address = object.get_owner_address();
let asset_type = inner.metadata.get_reference_address();
// If it's a fungible token, return early
if !FungibleAssetMetadataModel::is_address_fungible_asset(
conn,
&asset_type,
object_metadatas,
txn_version,
)
.await
{
return Ok(None);
}
let is_primary = Self::is_primary(&owner_address, &asset_type, &storage_id);

let coin_balance = Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use crate::{
token_v2_models::v2_token_utils::TokenStandard,
},
schema::fungible_asset_metadata,
utils::{database::PgPoolConnection, util::standardize_address},
utils::util::standardize_address,
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::WriteResource;
use bigdecimal::BigDecimal;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

Expand All @@ -44,26 +44,8 @@ pub struct FungibleAssetMetadataModel {
pub supply_aggregator_table_key_v1: Option<String>,
pub token_standard: String,
pub is_token_v2: Option<bool>,
}

#[derive(Debug, Deserialize, Identifiable, Queryable, Serialize)]
#[diesel(primary_key(asset_type))]
#[diesel(table_name = fungible_asset_metadata)]
pub struct FungibleAssetMetadataQuery {
pub asset_type: String,
pub creator_address: String,
pub name: String,
pub symbol: String,
pub decimals: i32,
pub icon_uri: Option<String>,
pub project_uri: Option<String>,
pub last_transaction_version: i64,
pub last_transaction_timestamp: chrono::NaiveDateTime,
pub supply_aggregator_table_handle_v1: Option<String>,
pub supply_aggregator_table_key_v1: Option<String>,
pub token_standard: String,
pub inserted_at: chrono::NaiveDateTime,
pub is_token_v2: Option<bool>,
pub supply_v2: Option<BigDecimal>,
pub maximum_v2: Option<BigDecimal>,
}

impl FungibleAssetMetadataModel {
Expand All @@ -81,7 +63,16 @@ impl FungibleAssetMetadataModel {
let asset_type = standardize_address(&write_resource.address.to_string());
if let Some(object_metadata) = object_metadatas.get(&asset_type) {
let object = &object_metadata.object.object_core;
let is_token_v2 = object_metadata.token.is_some();
let fungible_asset_supply = object_metadata.fungible_asset_supply.as_ref();
let (maximum_v2, supply_v2) =
if let Some(fungible_asset_supply) = fungible_asset_supply {
(
fungible_asset_supply.get_maximum(),
Some(fungible_asset_supply.current.clone()),
)
} else {
(None, None)
};

return Ok(Some(Self {
asset_type: asset_type.clone(),
Expand All @@ -96,7 +87,9 @@ impl FungibleAssetMetadataModel {
supply_aggregator_table_handle_v1: None,
supply_aggregator_table_key_v1: None,
token_standard: TokenStandard::V2.to_string(),
is_token_v2: Some(is_token_v2),
is_token_v2: None,
supply_v2,
maximum_v2,
}));
}
}
Expand Down Expand Up @@ -135,7 +128,9 @@ impl FungibleAssetMetadataModel {
supply_aggregator_table_handle_v1: supply_aggregator_table_handle,
supply_aggregator_table_key_v1: supply_aggregator_table_key,
token_standard: TokenStandard::V1.to_string(),
is_token_v2: Some(false),
is_token_v2: None,
supply_v2: None,
maximum_v2: None,
}))
} else {
Ok(None)
Expand All @@ -144,54 +139,4 @@ impl FungibleAssetMetadataModel {
_ => Ok(None),
}
}

/// A fungible asset can also be a token. We will make a best effort guess at whether this is a fungible token.
/// 1. If metadata is present without token object, then it's not a token
/// 2. If metadata is not present, we will do a lookup in the db.
pub async fn is_address_fungible_asset(
conn: &mut PgPoolConnection<'_>,
asset_type: &str,
object_aggregated_data_mapping: &ObjectAggregatedDataMapping,
txn_version: i64,
) -> bool {
// 1. If metadata is present without token object, then it's not a token
if let Some(object_data) = object_aggregated_data_mapping.get(asset_type) {
if object_data.fungible_asset_metadata.is_some() {
return object_data.token.is_none();
}
}
// 2. If metadata is not present, we will do a lookup in the db.
match FungibleAssetMetadataQuery::get_by_asset_type(conn, asset_type).await {
Ok(metadata) => {
if let Some(is_token_v2) = metadata.is_token_v2 {
return !is_token_v2;
}

// If is_token_v2 is null, then the metadata is a v1 coin info, and it's not a token
true
},
Err(_) => {
tracing::error!(
transaction_version = txn_version,
lookup_key = asset_type,
"Missing fungible_asset_metadata for asset_type: {}. You probably should backfill db.",
asset_type,
);
// Default
true
},
}
}
}

impl FungibleAssetMetadataQuery {
pub async fn get_by_asset_type(
conn: &mut PgPoolConnection<'_>,
asset_type: &str,
) -> diesel::QueryResult<Self> {
fungible_asset_metadata::table
.filter(fungible_asset_metadata::asset_type.eq(asset_type))
.first::<Self>(conn)
.await
}
}
Loading
Loading