Skip to content

Commit

Permalink
add parquet_fungible_asset_processor (#456)
Browse files Browse the repository at this point in the history
* add parquet_fungible_asset_processor

* lint
  • Loading branch information
yuunlimm authored Jul 12, 2024
1 parent 08973de commit ed72b46
Show file tree
Hide file tree
Showing 9 changed files with 669 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ pub mod v2_fungible_asset_activities;
pub mod v2_fungible_asset_balances;
pub mod v2_fungible_asset_utils;
pub mod v2_fungible_metadata;

// parquet models
pub mod parquet_coin_supply;
pub mod parquet_v2_fungible_asset_balances;
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::default_models::move_tables::TableItem,
utils::util::{hash_str, APTOS_COIN_TYPE_STR},
};
use allocative_derive::Allocative;
use anyhow::Context;
use aptos_protos::transaction::v1::WriteTableItem;
use bigdecimal::BigDecimal;
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

const APTOS_COIN_SUPPLY_TABLE_HANDLE: &str =
"0x1b854694ae746cdbd8d44186ca4929b2b337df21d1c74633be19b2710552fdca";
const APTOS_COIN_SUPPLY_TABLE_KEY: &str =
"0x619dc29a0aac8fa146714058e8dd6d2d0f3bdf5f6331907bf91f3acd81e6935";

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct CoinSupply {
pub txn_version: i64,
pub coin_type_hash: String,
pub coin_type: String,
pub supply: String, // it is a string representation of the u128
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for CoinSupply {
const TABLE_NAME: &'static str = "coin_supply";
}

impl HasVersion for CoinSupply {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for CoinSupply {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

impl CoinSupply {
/// Currently only supports aptos_coin. Aggregator table detail is in CoinInfo which for aptos coin appears during genesis.
/// We query for the aggregator table details (handle and key) once upon indexer initiation and use it to fetch supply.
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<Self>> {
if let Some(data) = &write_table_item.data {
// Return early if not aggregator table type
if !(data.key_type == "address" && data.value_type == "u128") {
return Ok(None);
}
// Return early if not aggregator table handle
if write_table_item.handle.as_str() != APTOS_COIN_SUPPLY_TABLE_HANDLE {
return Ok(None);
}

// Convert to TableItem model. Some fields are just placeholders
let (table_item_model, _) =
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);

// Return early if not aptos coin aggregator key
let table_key = table_item_model.decoded_key.as_str().unwrap();
if table_key != APTOS_COIN_SUPPLY_TABLE_KEY {
return Ok(None);
}
// Everything matches. Get the coin supply
let supply = table_item_model
.decoded_value
.as_ref()
.unwrap()
.as_str()
.unwrap()
.parse::<BigDecimal>()
.context(format!(
"cannot parse string as u128: {:?}, version {}",
table_item_model.decoded_value.as_ref(),
txn_version
))?;
return Ok(Some(Self {
txn_version,
coin_type_hash: hash_str(APTOS_COIN_TYPE_STR),
coin_type: APTOS_COIN_TYPE_STR.to_string(),
supply: supply.to_string(),
block_timestamp: txn_timestamp,
}));
}
Ok(None)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::{
coin_models::coin_utils::{CoinInfoType, CoinResource},
fungible_asset_models::{
v2_fungible_asset_activities::EventToCoinType,
v2_fungible_asset_balances::{
get_primary_fungible_store_address, CurrentFungibleAssetBalance,
},
v2_fungible_asset_utils::FungibleAssetStore,
},
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::TokenStandard,
},
utils::util::standardize_address,
};
use ahash::AHashMap;
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{DeleteResource, WriteResource};
use bigdecimal::{BigDecimal, Zero};
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct FungibleAssetBalance {
pub txn_version: i64,
pub write_set_change_index: i64,
pub storage_id: String,
pub owner_address: String,
pub asset_type: String,
pub is_primary: bool,
pub is_frozen: bool,
pub amount: String, // it is a string representation of the u128
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
pub token_standard: String,
}

impl NamedTable for FungibleAssetBalance {
const TABLE_NAME: &'static str = "fungible_asset_balances";
}

impl HasVersion for FungibleAssetBalance {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for FungibleAssetBalance {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

impl FungibleAssetBalance {
/// Basically just need to index FA Store, but we'll need to look up FA metadata
pub async fn get_v2_from_write_resource(
write_resource: &WriteResource,
write_set_change_index: i64,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
object_metadatas: &ObjectAggregatedDataMapping,
) -> anyhow::Result<Option<(Self, CurrentFungibleAssetBalance)>> {
if let Some(inner) = &FungibleAssetStore::from_write_resource(write_resource, txn_version)?
{
let storage_id = standardize_address(write_resource.address.as_str());
// Need to get the object of the store
if let Some(object_data) = object_metadatas.get(&storage_id) {
let object = &object_data.object.object_core;
let owner_address = object.get_owner_address();
let asset_type = inner.metadata.get_reference_address();
let is_primary = Self::is_primary(&owner_address, &asset_type, &storage_id);

let concurrent_balance = object_data
.concurrent_fungible_asset_balance
.as_ref()
.map(|concurrent_fungible_asset_balance| {
concurrent_fungible_asset_balance.balance.value.clone()
});

let coin_balance = Self {
txn_version,
write_set_change_index,
storage_id: storage_id.clone(),
owner_address: owner_address.clone(),
asset_type: asset_type.clone(),
is_primary,
is_frozen: inner.frozen,
amount: concurrent_balance
.clone()
.unwrap_or_else(|| inner.balance.clone())
.to_string(),
block_timestamp: txn_timestamp,
token_standard: TokenStandard::V2.to_string(),
};
let current_coin_balance = CurrentFungibleAssetBalance {
storage_id,
owner_address,
asset_type: asset_type.clone(),
is_primary,
is_frozen: inner.frozen,
amount: concurrent_balance.unwrap_or_else(|| inner.balance.clone()),
last_transaction_version: txn_version,
last_transaction_timestamp: txn_timestamp,
token_standard: TokenStandard::V2.to_string(),
};
return Ok(Some((coin_balance, current_coin_balance)));
}
}

Ok(None)
}

pub fn get_v1_from_delete_resource(
delete_resource: &DeleteResource,
write_set_change_index: i64,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, CurrentFungibleAssetBalance, EventToCoinType)>> {
if let Some(CoinResource::CoinStoreDeletion) =
&CoinResource::from_delete_resource(delete_resource, txn_version)?
{
let coin_info_type = &CoinInfoType::from_move_type(
&delete_resource.r#type.as_ref().unwrap().generic_type_params[0],
delete_resource.type_str.as_ref(),
txn_version,
);
if let Some(coin_type) = coin_info_type.get_coin_type_below_max() {
let owner_address = standardize_address(delete_resource.address.as_str());
let storage_id =
CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str());
let coin_balance = Self {
txn_version,
write_set_change_index,
storage_id: storage_id.clone(),
owner_address: owner_address.clone(),
asset_type: coin_type.clone(),
is_primary: true,
is_frozen: false,
amount: "0".to_string(),
block_timestamp: txn_timestamp,
token_standard: TokenStandard::V1.to_string(),
};
let current_coin_balance = CurrentFungibleAssetBalance {
storage_id,
owner_address,
asset_type: coin_type.clone(),
is_primary: true,
is_frozen: false,
amount: BigDecimal::zero(),
last_transaction_version: txn_version,
last_transaction_timestamp: txn_timestamp,
token_standard: TokenStandard::V1.to_string(),
};
return Ok(Some((
coin_balance,
current_coin_balance,
AHashMap::default(),
)));
}
}
Ok(None)
}

/// Getting coin balances from resources for v1
/// If the fully qualified coin type is too long (currently 1000 length), we exclude from indexing
pub fn get_v1_from_write_resource(
write_resource: &WriteResource,
write_set_change_index: i64,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, CurrentFungibleAssetBalance, EventToCoinType)>> {
if let Some(CoinResource::CoinStoreResource(inner)) =
&CoinResource::from_write_resource(write_resource, txn_version)?
{
let coin_info_type = &CoinInfoType::from_move_type(
&write_resource.r#type.as_ref().unwrap().generic_type_params[0],
write_resource.type_str.as_ref(),
txn_version,
);
if let Some(coin_type) = coin_info_type.get_coin_type_below_max() {
let owner_address = standardize_address(write_resource.address.as_str());
let storage_id =
CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str());
let coin_balance = Self {
txn_version,
write_set_change_index,
storage_id: storage_id.clone(),
owner_address: owner_address.clone(),
asset_type: coin_type.clone(),
is_primary: true,
is_frozen: inner.frozen,
amount: inner.coin.value.clone().to_string(),
block_timestamp: txn_timestamp,
token_standard: TokenStandard::V1.to_string(),
};
let current_coin_balance = CurrentFungibleAssetBalance {
storage_id,
owner_address,
asset_type: coin_type.clone(),
is_primary: true,
is_frozen: inner.frozen,
amount: inner.coin.value.clone(),
last_transaction_version: txn_version,
last_transaction_timestamp: txn_timestamp,
token_standard: TokenStandard::V1.to_string(),
};
let event_to_coin_mapping: EventToCoinType = AHashMap::from([
(
inner.withdraw_events.guid.id.get_standardized(),
coin_type.clone(),
),
(inner.deposit_events.guid.id.get_standardized(), coin_type),
]);
return Ok(Some((
coin_balance,
current_coin_balance,
event_to_coin_mapping,
)));
}
}
Ok(None)
}

/// Primary store address are derived from the owner address and object address in this format: sha3_256([source | object addr | 0xFC]).
/// This function expects the addresses to have length 66
pub fn is_primary(
owner_address: &str,
metadata_address: &str,
fungible_store_address: &str,
) -> bool {
fungible_store_address
== get_primary_fungible_store_address(owner_address, metadata_address).unwrap()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn get_paired_metadata_address(coin_type_name: &str) -> String {
}
}

fn get_primary_fungible_store_address(
pub fn get_primary_fungible_store_address(
owner_address: &str,
metadata_address: &str,
) -> anyhow::Result<String> {
Expand Down
6 changes: 1 addition & 5 deletions rust/processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl ProcessorTrait for FungibleAssetProcessor {
mut fungible_asset_balances,
mut current_fungible_asset_balances,
current_unified_fungible_asset_balances,
mut coin_supply,
coin_supply,
) = parse_v2_coin(&transactions).await;

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
Expand All @@ -389,10 +389,6 @@ impl ProcessorTrait for FungibleAssetProcessor {
current_fungible_asset_balances.clear();
}

if self.deprecated_tables.contains(TableFlags::COIN_SUPPLY) {
coin_supply.clear();
}

let tx_result = insert_to_db(
self.get_pool(),
self.name(),
Expand Down
Loading

0 comments on commit ed72b46

Please sign in to comment.