From 6b0c7ceb4b1809a017866bb0c239653ffdb13941 Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Fri, 26 Jul 2024 17:58:25 -0700 Subject: [PATCH] add parquet_token_v2_processor (#462) * temp * remove logs * add more metrics * add parquet_token_v2_processor * exclude collection v2 * add struct count map * rebase * lint * use constant * lint --- .../parquet_v2_fungible_asset_balances.rs | 7 +- .../db/common/models/token_v2_models/mod.rs | 5 + .../token_v2_models/parquet_v2_collections.rs | 306 ++++++++++++++ .../token_v2_models/parquet_v2_token_datas.rs | 256 ++++++++++++ .../parquet_v2_token_ownerships.rs | 272 +++++++++++++ .../models/user_transactions_models/mod.rs | 2 + .../parquet_signatures.rs | 22 + rust/processor/src/processors/mod.rs | 4 + .../src/processors/parquet_processors/mod.rs | 3 +- .../parquet_default_processor.rs | 1 - .../parquet_token_v2_processor.rs | 383 ++++++++++++++++++ rust/processor/src/worker.rs | 8 + 12 files changed, 1266 insertions(+), 3 deletions(-) create mode 100644 rust/processor/src/db/common/models/token_v2_models/parquet_v2_collections.rs create mode 100644 rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_datas.rs create mode 100644 rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_ownerships.rs create mode 100644 rust/processor/src/db/common/models/user_transactions_models/parquet_signatures.rs create mode 100644 rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs diff --git a/rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_balances.rs b/rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_balances.rs index 0c648c7cb..d5140e92f 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_balances.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_balances.rs @@ -26,9 +26,14 @@ use allocative_derive::Allocative; use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; use bigdecimal::{BigDecimal, Zero}; use field_count::FieldCount; +use lazy_static::lazy_static; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; +lazy_static! { + pub static ref DEFAULT_AMOUNT_VALUE: String = "0".to_string(); +} + #[derive( Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, )] @@ -147,7 +152,7 @@ impl FungibleAssetBalance { asset_type: coin_type.clone(), is_primary: true, is_frozen: false, - amount: "0".to_string(), + amount: DEFAULT_AMOUNT_VALUE.clone(), block_timestamp: txn_timestamp, token_standard: TokenStandard::V1.to_string(), }; diff --git a/rust/processor/src/db/common/models/token_v2_models/mod.rs b/rust/processor/src/db/common/models/token_v2_models/mod.rs index 49bd71da5..3a2aa0567 100644 --- a/rust/processor/src/db/common/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/common/models/token_v2_models/mod.rs @@ -8,3 +8,8 @@ pub mod v2_token_datas; pub mod v2_token_metadata; pub mod v2_token_ownerships; pub mod v2_token_utils; + +// parquet models +// pub mod parquet_v2_collections; // revisit this +pub mod parquet_v2_token_datas; +pub mod parquet_v2_token_ownerships; diff --git a/rust/processor/src/db/common/models/token_v2_models/parquet_v2_collections.rs b/rust/processor/src/db/common/models/token_v2_models/parquet_v2_collections.rs new file mode 100644 index 000000000..02510c419 --- /dev/null +++ b/rust/processor/src/db/common/models/token_v2_models/parquet_v2_collections.rs @@ -0,0 +1,306 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::{ + object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_models::{ + collection_datas::CollectionData, + token_utils::{CollectionDataIdType, TokenWriteSet}, + tokens::TableHandleToOwner, + }, + token_v2_models::{ + v2_collections::CreatorFromCollectionTableV1, + v2_token_utils::{TokenStandard, V2TokenResource}, + }, + }, + utils::{database::DbPoolConnection, util::standardize_address}, +}; +use allocative_derive::Allocative; +use anyhow::Context; +use aptos_protos::transaction::v1::{WriteResource, WriteTableItem}; +use bigdecimal::{BigDecimal, Zero}; +use diesel::{sql_query, sql_types::Text}; +use diesel_async::RunQueryDsl; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct CollectionV2 { + pub txn_version: i64, + pub write_set_change_index: i64, + pub collection_id: String, + pub creator_address: String, + pub collection_name: String, + pub description: String, + pub uri: String, + pub current_supply: String, + pub max_supply: Option, + pub total_minted_v2: Option, + pub mutable_description: Option, + pub mutable_uri: Option, + pub table_handle_v1: Option, + pub token_standard: String, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for CollectionV2 { + const TABLE_NAME: &'static str = "collection_v2"; +} + +impl HasVersion for CollectionV2 { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for CollectionV2 { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl CollectionV2 { + pub fn get_v2_from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + object_metadatas: &ObjectAggregatedDataMapping, + ) -> anyhow::Result> { + let type_str = crate::db::common::models::default_models::move_resources::MoveResource::get_outer_type_from_write_resource(write_resource); + if !V2TokenResource::is_resource_supported(type_str.as_str()) { + return Ok(None); + } + let resource = crate::db::common::models::default_models::move_resources::MoveResource::from_write_resource( + write_resource, + 0, // Placeholder, this isn't used anyway + txn_version, + 0, // Placeholder, this isn't used anyway + ); + + if let V2TokenResource::Collection(inner) = &V2TokenResource::from_resource( + &type_str, + resource.data.as_ref().unwrap(), + txn_version, + )? { + let (mut current_supply, mut max_supply, mut total_minted_v2) = + (BigDecimal::zero(), None, None); + let (mut mutable_description, mut mutable_uri) = (None, None); + if let Some(object_data) = object_metadatas.get(&resource.address) { + // Getting supply data (prefer fixed supply over unlimited supply although they should never appear at the same time anyway) + let fixed_supply = object_data.fixed_supply.as_ref(); + let unlimited_supply = object_data.unlimited_supply.as_ref(); + if let Some(supply) = unlimited_supply { + (current_supply, max_supply, total_minted_v2) = ( + supply.current_supply.clone(), + None, + Some(supply.total_minted.clone()), + ); + } + if let Some(supply) = fixed_supply { + (current_supply, max_supply, total_minted_v2) = ( + supply.current_supply.clone(), + Some(supply.max_supply.clone()), + Some(supply.total_minted.clone()), + ); + } + + // Aggregator V2 enables a separate struct for supply + let concurrent_supply = object_data.concurrent_supply.as_ref(); + if let Some(supply) = concurrent_supply { + (current_supply, max_supply, total_minted_v2) = ( + supply.current_supply.value.clone(), + if supply.current_supply.max_value == u64::MAX.into() { + None + } else { + Some(supply.current_supply.max_value.clone()) + }, + Some(supply.total_minted.value.clone()), + ); + } + + // Getting collection mutability config from AptosCollection + let collection = object_data.aptos_collection.as_ref(); + if let Some(collection) = collection { + mutable_description = Some(collection.mutable_description); + mutable_uri = Some(collection.mutable_uri); + } + } else { + // ObjectCore should not be missing, returning from entire function early + return Ok(None); + } + + let collection_id = resource.address.clone(); + let creator_address = inner.get_creator_address(); + let collection_name = inner.get_name_trunc(); + let description = inner.description.clone(); + let uri = inner.get_uri_trunc(); + + Ok(Some(Self { + txn_version, + write_set_change_index, + collection_id: collection_id.clone(), + creator_address: creator_address.clone(), + collection_name: collection_name.clone(), + description: description.clone(), + uri: uri.clone(), + current_supply: current_supply.to_string(), + max_supply: Some(max_supply.clone().unwrap().clone().to_string()), + total_minted_v2: Some(total_minted_v2.clone().unwrap().clone().to_string()), + mutable_description, + mutable_uri, + table_handle_v1: None, + token_standard: TokenStandard::V2.to_string(), + block_timestamp: txn_timestamp, + })) + } else { + Ok(None) + } + } + + pub async fn get_v1_from_write_table_item( + table_item: &WriteTableItem, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + table_handle_to_owner: &TableHandleToOwner, + conn: &mut DbPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, + ) -> anyhow::Result> { + let table_item_data = table_item.data.as_ref().unwrap(); + + let maybe_collection_data = match TokenWriteSet::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? { + Some(TokenWriteSet::CollectionData(inner)) => Some(inner), + _ => None, + }; + if let Some(collection_data) = maybe_collection_data { + let table_handle = table_item.handle.to_string(); + let maybe_creator_address = table_handle_to_owner + .get(&standardize_address(&table_handle)) + .map(|table_metadata| table_metadata.get_owner_address()); + let mut creator_address = match maybe_creator_address { + Some(ca) => ca, + None => { + match Self::get_collection_creator_for_v1( + conn, + &table_handle, + query_retries, + query_retry_delay_ms, + ) + .await + .context(format!( + "Failed to get collection creator for table handle {}, txn version {}", + table_handle, txn_version + )) { + Ok(ca) => ca, + Err(_) => { + // Try our best by getting from the older collection data + match CollectionData::get_collection_creator( + conn, + &table_handle, + query_retries, + query_retry_delay_ms, + ) + .await + { + Ok(creator) => creator, + Err(_) => { + tracing::error!( + transaction_version = txn_version, + lookup_key = &table_handle, + "Failed to get collection v2 creator for table handle. You probably should backfill db." + ); + return Ok(None); + }, + } + }, + } + }, + }; + creator_address = standardize_address(&creator_address); + let collection_id_struct = + CollectionDataIdType::new(creator_address, collection_data.get_name().to_string()); + let collection_id = collection_id_struct.to_id(); + let collection_name = collection_data.get_name_trunc(); + let uri = collection_data.get_uri_trunc(); + + Ok(Some(Self { + txn_version, + write_set_change_index, + collection_id: collection_id.clone(), + creator_address: collection_id_struct.creator.clone(), + collection_name: collection_name.clone(), + description: collection_data.description.clone(), + uri: uri.clone(), + current_supply: collection_data.supply.to_string(), + max_supply: Some(collection_data.maximum.to_string()), + total_minted_v2: None, + mutable_uri: Some(collection_data.mutability_config.uri), + mutable_description: Some(collection_data.mutability_config.description), + table_handle_v1: Some(table_handle.clone()), + token_standard: TokenStandard::V1.to_string(), + block_timestamp: txn_timestamp, + })) + } else { + Ok(None) + } + } + + /// If collection data is not in resources of the same transaction, then try looking for it in the database. Since collection owner + /// cannot change, we can just look in the current_collection_datas table. + /// Retrying a few times since this collection could've been written in a separate thread. + async fn get_collection_creator_for_v1( + conn: &mut DbPoolConnection<'_>, + table_handle: &str, + query_retries: u32, + query_retry_delay_ms: u64, + ) -> anyhow::Result { + let mut tried = 0; + while tried < query_retries { + tried += 1; + match Self::get_by_table_handle(conn, table_handle).await { + Ok(creator) => return Ok(creator), + Err(_) => { + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } + }, + } + } + Err(anyhow::anyhow!("Failed to get collection creator")) + } + + /// TODO: Change this to a KV store + async fn get_by_table_handle( + conn: &mut DbPoolConnection<'_>, + table_handle: &str, + ) -> anyhow::Result { + let mut res: Vec> = sql_query( + "SELECT creator_address FROM current_collections_v2 WHERE table_handle_v1 = $1", + ) + .bind::(table_handle) + .get_results(conn) + .await?; + Ok(res + .pop() + .context("collection result empty")? + .context("collection result null")? + .creator_address) + } +} diff --git a/rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_datas.rs b/rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_datas.rs new file mode 100644 index 000000000..0f06fc1d6 --- /dev/null +++ b/rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_datas.rs @@ -0,0 +1,256 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::{ + object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_models::token_utils::TokenWriteSet, + token_v2_models::{ + v2_token_datas::CurrentTokenDataV2, + v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned}, + }, + }, + utils::util::standardize_address, +}; +use allocative_derive::Allocative; +use anyhow::Context; +use aptos_protos::transaction::v1::{DeleteResource, WriteResource, WriteTableItem}; +use bigdecimal::ToPrimitive; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct TokenDataV2 { + pub txn_version: i64, + pub write_set_change_index: i64, + pub token_data_id: String, + pub collection_id: String, + pub token_name: String, + pub largest_property_version_v1: Option, + pub token_uri: String, + pub token_properties: String, + pub description: String, + pub token_standard: String, + pub is_fungible_v2: Option, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, + pub is_deleted_v2: Option, +} + +impl NamedTable for TokenDataV2 { + const TABLE_NAME: &'static str = "token_datas_v2"; +} + +impl HasVersion for TokenDataV2 { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for TokenDataV2 { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl TokenDataV2 { + // TODO: remove the useless_asref lint when new clippy nighly is released. + #[allow(clippy::useless_asref)] + pub fn get_v2_from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + object_metadatas: &ObjectAggregatedDataMapping, + ) -> anyhow::Result> { + if let Some(inner) = &TokenV2::from_write_resource(write_resource, txn_version)? { + let token_data_id = standardize_address(&write_resource.address.to_string()); + let mut token_name = inner.get_name_trunc(); + let is_fungible_v2; + // Get token properties from 0x4::property_map::PropertyMap + let mut token_properties = serde_json::Value::Null; + if let Some(object_metadata) = object_metadatas.get(&token_data_id) { + let fungible_asset_metadata = object_metadata.fungible_asset_metadata.as_ref(); + if fungible_asset_metadata.is_some() { + is_fungible_v2 = Some(true); + } else { + is_fungible_v2 = Some(false); + } + token_properties = object_metadata + .property_map + .as_ref() + .map(|m| m.inner.clone()) + .unwrap_or(token_properties); + // In aggregator V2 name is now derived from a separate struct + if let Some(token_identifier) = object_metadata.token_identifier.as_ref() { + token_name = token_identifier.get_name_trunc(); + } + } else { + // ObjectCore should not be missing, returning from entire function early + return Ok(None); + } + + let collection_id = inner.get_collection_address(); + let token_uri = inner.get_uri_trunc(); + + Ok(Some(Self { + txn_version, + write_set_change_index, + token_data_id: token_data_id.clone(), + collection_id: collection_id.clone(), + token_name: token_name.clone(), + largest_property_version_v1: None, + token_uri: token_uri.clone(), + token_properties: canonical_json::to_string(&token_properties.clone()) + .context("Failed to serialize token properties")?, + description: inner.description.clone(), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2, + block_timestamp: txn_timestamp, + is_deleted_v2: None, + })) + } else { + Ok(None) + } + } + + /// This handles the case where token is burned but objectCore is still there + pub async fn get_burned_nft_v2_from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + tokens_burned: &TokenV2Burned, + ) -> anyhow::Result> { + let token_data_id = standardize_address(&write_resource.address.to_string()); + // reminder that v1 events won't get to this codepath + if let Some(burn_event_v2) = tokens_burned.get(&standardize_address(&token_data_id)) { + Ok(Some(CurrentTokenDataV2 { + token_data_id, + collection_id: burn_event_v2.get_collection_address(), + token_name: "".to_string(), + maximum: None, + supply: None, + largest_property_version_v1: None, + token_uri: "".to_string(), + token_properties: serde_json::Value::Null, + description: "".to_string(), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: Some(false), + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + decimals: None, + is_deleted_v2: Some(true), + })) + } else { + Ok(None) + } + } + + /// This handles the case where token is burned and objectCore is deleted + pub async fn get_burned_nft_v2_from_delete_resource( + delete_resource: &DeleteResource, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + tokens_burned: &TokenV2Burned, + ) -> anyhow::Result> { + let token_data_id = standardize_address(&delete_resource.address.to_string()); + // reminder that v1 events won't get to this codepath + if let Some(burn_event_v2) = tokens_burned.get(&standardize_address(&token_data_id)) { + Ok(Some(CurrentTokenDataV2 { + token_data_id, + collection_id: burn_event_v2.get_collection_address(), + token_name: "".to_string(), + maximum: None, + supply: None, + largest_property_version_v1: None, + token_uri: "".to_string(), + token_properties: serde_json::Value::Null, + description: "".to_string(), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: Some(false), + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + decimals: None, + is_deleted_v2: Some(true), + })) + } else { + Ok(None) + } + } + + pub fn get_v1_from_write_table_item( + table_item: &WriteTableItem, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + let table_item_data = table_item.data.as_ref().unwrap(); + + let maybe_token_data = match TokenWriteSet::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? { + Some(TokenWriteSet::TokenData(inner)) => Some(inner), + _ => None, + }; + + if let Some(token_data) = maybe_token_data { + let maybe_token_data_id = match TokenWriteSet::from_table_item_type( + table_item_data.key_type.as_str(), + &table_item_data.key, + txn_version, + )? { + Some(TokenWriteSet::TokenDataId(inner)) => Some(inner), + _ => None, + }; + if let Some(token_data_id_struct) = maybe_token_data_id { + let collection_id = token_data_id_struct.get_collection_id(); + let token_data_id = token_data_id_struct.to_id(); + let token_name = token_data_id_struct.get_name_trunc(); + let token_uri = token_data.get_uri_trunc(); + + return Ok(Some(Self { + txn_version, + write_set_change_index, + token_data_id: token_data_id.clone(), + collection_id: collection_id.clone(), + token_name: token_name.clone(), + largest_property_version_v1: Some( + token_data + .largest_property_version + .clone() + .to_u64() + .unwrap(), + ), + token_uri: token_uri.clone(), + token_properties: canonical_json::to_string( + &token_data.default_properties.clone(), + ) + .context("Failed to serialize token properties")?, + description: token_data.description.clone(), + token_standard: TokenStandard::V1.to_string(), + is_fungible_v2: None, + block_timestamp: txn_timestamp, + is_deleted_v2: None, + })); + } else { + tracing::warn!( + transaction_version = txn_version, + key_type = table_item_data.key_type, + key = table_item_data.key, + "Expecting token_data_id as key for value = token_data" + ); + } + } + Ok(None) + } +} diff --git a/rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_ownerships.rs b/rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_ownerships.rs new file mode 100644 index 000000000..855b324a3 --- /dev/null +++ b/rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_ownerships.rs @@ -0,0 +1,272 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::{ + fungible_asset_models::parquet_v2_fungible_asset_balances::DEFAULT_AMOUNT_VALUE, + object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_models::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}, + token_v2_models::{ + parquet_v2_token_datas::TokenDataV2, v2_token_ownerships::CurrentTokenOwnershipV2, + v2_token_utils::TokenStandard, + }, + }, + utils::util::{ensure_not_negative, standardize_address}, +}; +use allocative_derive::Allocative; +use anyhow::Context; +use aptos_protos::transaction::v1::{DeleteTableItem, WriteTableItem}; +use bigdecimal::{BigDecimal, ToPrimitive, Zero}; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +const LEGACY_DEFAULT_PROPERTY_VERSION: u64 = 0; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct TokenOwnershipV2 { + pub txn_version: i64, + pub write_set_change_index: i64, + pub token_data_id: String, + pub property_version_v1: u64, + pub owner_address: Option, + pub storage_id: String, + pub amount: String, // this is a string representation of a bigdecimal + pub table_type_v1: Option, + pub token_properties_mutated_v1: Option, + pub is_soulbound_v2: Option, + pub token_standard: String, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, + pub non_transferrable_by_owner: Option, +} + +impl NamedTable for TokenOwnershipV2 { + const TABLE_NAME: &'static str = "token_ownerships_v2"; +} + +impl HasVersion for TokenOwnershipV2 { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for TokenOwnershipV2 { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl TokenOwnershipV2 { + /// For nfts it's the same resources that we parse tokendatas from so we leverage the work done in there to get ownership data + /// Vecs are returned because there could be multiple transfers and we need to document each one here. + pub fn get_nft_v2_from_token_data( + token_data: &TokenDataV2, + object_metadatas: &ObjectAggregatedDataMapping, + ) -> anyhow::Result> { + let mut ownerships = vec![]; + // let mut current_ownerships = AHashMap::new(); + + let object_data = object_metadatas + .get(&token_data.token_data_id) + .context("If token data exists objectcore must exist")?; + let object_core = object_data.object.object_core.clone(); + let token_data_id = token_data.token_data_id.clone(); + let owner_address = object_core.get_owner_address(); + let storage_id = token_data_id.clone(); + + // is_soulbound currently means if an object is completely untransferrable + // OR if only admin can transfer. Only the former is true soulbound but + // people might already be using it with the latter meaning so let's include both. + let is_soulbound = if object_data.untransferable.as_ref().is_some() { + true + } else { + !object_core.allow_ungated_transfer + }; + let non_transferrable_by_owner = !object_core.allow_ungated_transfer; + + ownerships.push(Self { + txn_version: token_data.txn_version, + write_set_change_index: token_data.write_set_change_index, + token_data_id: token_data_id.clone(), + property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, + owner_address: Some(owner_address.clone()), + storage_id: storage_id.clone(), + amount: DEFAULT_AMOUNT_VALUE.clone(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: Some(is_soulbound), + token_standard: TokenStandard::V2.to_string(), + block_timestamp: token_data.block_timestamp, + non_transferrable_by_owner: Some(non_transferrable_by_owner), + }); + + // check if token was transferred + for (event_index, transfer_event) in &object_data.transfer_events { + // If it's a self transfer then skip + if transfer_event.get_to_address() == transfer_event.get_from_address() { + continue; + } + ownerships.push(Self { + txn_version: token_data.txn_version, + // set to negative of event index to avoid collison with write set index + write_set_change_index: -1 * event_index, + token_data_id: token_data_id.clone(), + property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, + // previous owner + owner_address: Some(transfer_event.get_from_address()), + storage_id: storage_id.clone(), + // soft delete + amount: DEFAULT_AMOUNT_VALUE.clone(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: Some(is_soulbound), + token_standard: TokenStandard::V2.to_string(), + block_timestamp: token_data.block_timestamp, + non_transferrable_by_owner: Some(is_soulbound), + }); + } + Ok(ownerships) + } + + /// We want to track tokens in any offer/claims and tokenstore + pub fn get_v1_from_delete_table_item( + table_item: &DeleteTableItem, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + table_handle_to_owner: &TableHandleToOwner, + ) -> anyhow::Result> { + let table_item_data = table_item.data.as_ref().unwrap(); + + let maybe_token_id = match TokenWriteSet::from_table_item_type( + table_item_data.key_type.as_str(), + &table_item_data.key, + txn_version, + )? { + Some(TokenWriteSet::TokenId(inner)) => Some(inner), + _ => None, + }; + + if let Some(token_id_struct) = maybe_token_id { + let table_handle = standardize_address(&table_item.handle.to_string()); + let token_data_id_struct = token_id_struct.token_data_id; + let token_data_id = token_data_id_struct.to_id(); + + let maybe_table_metadata = table_handle_to_owner.get(&table_handle); + let (_, owner_address, table_type) = match maybe_table_metadata { + Some(tm) => { + if tm.table_type != "0x3::token::TokenStore" { + return Ok(None); + } + let owner_address = tm.get_owner_address(); + ( + Some(CurrentTokenOwnershipV2 { + token_data_id: token_data_id.clone(), + property_version_v1: token_id_struct.property_version.clone(), + owner_address: owner_address.clone(), + storage_id: table_handle.clone(), + amount: BigDecimal::zero(), + table_type_v1: Some(tm.table_type.clone()), + token_properties_mutated_v1: None, + is_soulbound_v2: None, + token_standard: TokenStandard::V1.to_string(), + is_fungible_v2: None, + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: None, + }), + Some(owner_address), + Some(tm.table_type.clone()), + ) + }, + None => (None, None, None), + }; + + Ok(Some(Self { + txn_version, + write_set_change_index, + token_data_id, + property_version_v1: token_id_struct.property_version.to_u64().unwrap(), + owner_address, + storage_id: table_handle, + amount: DEFAULT_AMOUNT_VALUE.clone(), + table_type_v1: table_type, + token_properties_mutated_v1: None, + is_soulbound_v2: None, + token_standard: TokenStandard::V1.to_string(), + block_timestamp: txn_timestamp, + non_transferrable_by_owner: None, + })) + } else { + Ok(None) + } + } + + /// We want to track tokens in any offer/claims and tokenstore + pub fn get_v1_from_write_table_item( + table_item: &WriteTableItem, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + table_handle_to_owner: &TableHandleToOwner, + ) -> anyhow::Result> { + let table_item_data = table_item.data.as_ref().unwrap(); + + let maybe_token = match TokenWriteSet::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? { + Some(TokenWriteSet::Token(inner)) => Some(inner), + _ => None, + }; + + if let Some(token) = maybe_token { + let table_handle = standardize_address(&table_item.handle.to_string()); + let amount = ensure_not_negative(token.amount); + let token_id_struct = token.id; + let token_data_id_struct = token_id_struct.token_data_id; + let token_data_id = token_data_id_struct.to_id(); + + let maybe_table_metadata = table_handle_to_owner.get(&table_handle); + let (owner_address, table_type) = match maybe_table_metadata { + Some(tm) => { + if tm.table_type != "0x3::token::TokenStore" { + return Ok(None); + } + let owner_address = tm.get_owner_address(); + (Some(owner_address), Some(tm.table_type.clone())) + }, + None => (None, None), + }; + + Ok(Some(Self { + txn_version, + write_set_change_index, + token_data_id, + property_version_v1: token_id_struct.property_version.to_u64().unwrap(), + owner_address, + storage_id: table_handle, + amount: amount.to_string(), + table_type_v1: table_type, + token_properties_mutated_v1: Some( + canonical_json::to_string(&token.token_properties).unwrap(), + ), + is_soulbound_v2: None, + token_standard: TokenStandard::V1.to_string(), + block_timestamp: txn_timestamp, + non_transferrable_by_owner: None, + })) + } else { + Ok(None) + } + } +} diff --git a/rust/processor/src/db/common/models/user_transactions_models/mod.rs b/rust/processor/src/db/common/models/user_transactions_models/mod.rs index fbb27686b..9818ff1d9 100644 --- a/rust/processor/src/db/common/models/user_transactions_models/mod.rs +++ b/rust/processor/src/db/common/models/user_transactions_models/mod.rs @@ -3,3 +3,5 @@ pub mod signatures; pub mod user_transactions; +// parquet models +pub mod parquet_signatures; diff --git a/rust/processor/src/db/common/models/user_transactions_models/parquet_signatures.rs b/rust/processor/src/db/common/models/user_transactions_models/parquet_signatures.rs new file mode 100644 index 000000000..933ea8c60 --- /dev/null +++ b/rust/processor/src/db/common/models/user_transactions_models/parquet_signatures.rs @@ -0,0 +1,22 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#![allow(clippy::extra_unused_lifetimes)] + +use field_count::FieldCount; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)] +pub struct Signature { + pub txn_version: i64, + pub multi_agent_index: i64, + pub multi_sig_index: i64, + pub transaction_block_height: i64, + pub signer: String, + pub is_sender_primary: bool, + pub type_: String, + pub public_key: String, + pub signature: String, + pub threshold: i64, + pub public_key_indices: String, +} diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index ebcaf0c86..4b47306b5 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -42,6 +42,7 @@ use crate::{ parquet_fungible_asset_processor::{ ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig, }, + parquet_token_v2_processor::{ParquetTokenV2Processor, ParquetTokenV2ProcessorConfig}, parquet_transaction_metadata_processor::{ ParquetTransactionMetadataProcessor, ParquetTransactionMetadataProcessorConfig, }, @@ -202,6 +203,7 @@ pub enum ProcessorConfig { ParquetTransactionMetadataProcessor(ParquetTransactionMetadataProcessorConfig), ParquetAnsProcessor(ParquetAnsProcessorConfig), ParquetEventsProcessor(ParquetEventsProcessorConfig), + ParquetTokenV2Processor(ParquetTokenV2ProcessorConfig), } impl ProcessorConfig { @@ -219,6 +221,7 @@ impl ProcessorConfig { | ProcessorConfig::ParquetTransactionMetadataProcessor(_) | ProcessorConfig::ParquetAnsProcessor(_) | ProcessorConfig::ParquetEventsProcessor(_) + | ProcessorConfig::ParquetTokenV2Processor(_) ) } } @@ -259,6 +262,7 @@ pub enum Processor { ParquetTransactionMetadataProcessor, ParquetAnsProcessor, ParquetEventsProcessor, + ParquetTokenV2Processor, } #[cfg(test)] diff --git a/rust/processor/src/processors/parquet_processors/mod.rs b/rust/processor/src/processors/parquet_processors/mod.rs index a66b74111..833dd8689 100644 --- a/rust/processor/src/processors/parquet_processors/mod.rs +++ b/rust/processor/src/processors/parquet_processors/mod.rs @@ -4,9 +4,10 @@ pub mod parquet_ans_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; +pub mod parquet_token_v2_processor; pub mod parquet_transaction_metadata_processor; -pub const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; pub trait ParquetProcessorTrait { fn parquet_upload_interval_in_secs(&self) -> Duration; diff --git a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs index 4f2f6a9df..bad362ddc 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs @@ -38,7 +38,6 @@ pub struct ParquetDefaultProcessorConfig { pub max_buffer_size: usize, pub parquet_upload_interval: u64, } - impl ParquetProcessorTrait for ParquetDefaultProcessorConfig { fn parquet_upload_interval_in_secs(&self) -> Duration { Duration::from_secs(self.parquet_upload_interval) diff --git a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs new file mode 100644 index 000000000..723254b3c --- /dev/null +++ b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs @@ -0,0 +1,383 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + bq_analytics::{ + create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric, + ParquetProcessingResult, + }, + db::common::models::{ + fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, Untransferable, + }, + token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, + token_v2_models::{ + parquet_v2_token_datas::TokenDataV2, + parquet_v2_token_ownerships::TokenOwnershipV2, + v2_token_utils::{ + AptosCollection, ConcurrentSupply, FixedSupply, PropertyMapModel, TokenIdentifiers, + TokenV2, TransferEvent, UnlimitedSupply, + }, + }, + }, + gap_detectors::ProcessingResult, + processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait}, + utils::{ + counters::PROCESSOR_UNKNOWN_TYPE_COUNT, + database::ArcDbPool, + util::{parse_timestamp, standardize_address}, + }, +}; +use ahash::AHashMap; +use anyhow::Context; +use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction}; +use async_trait::async_trait; +use kanal::AsyncSender; +use serde::{Deserialize, Serialize}; +use std::{fmt::Debug, time::Duration}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ParquetTokenV2ProcessorConfig { + pub google_application_credentials: Option, + pub bucket_name: String, + pub bucket_root: String, + pub parquet_handler_response_channel_size: usize, + pub max_buffer_size: usize, + pub parquet_upload_interval: u64, +} +impl ParquetProcessorTrait for ParquetTokenV2ProcessorConfig { + fn parquet_upload_interval_in_secs(&self) -> Duration { + Duration::from_secs(self.parquet_upload_interval) + } +} + +pub struct ParquetTokenV2Processor { + connection_pool: ArcDbPool, + v2_token_datas_sender: AsyncSender>, + v2_token_ownerships_sender: AsyncSender>, +} + +impl ParquetTokenV2Processor { + pub fn new( + connection_pool: ArcDbPool, + config: ParquetTokenV2ProcessorConfig, + new_gap_detector_sender: AsyncSender, + ) -> Self { + config.set_google_credentials(config.google_application_credentials.clone()); + + let v2_token_datas_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::ParquetTokenV2Processor.into(), + config.bucket_name.clone(), + config.bucket_root.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + config.parquet_upload_interval_in_secs(), + ); + + let v2_token_ownerships_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::ParquetTokenV2Processor.into(), + config.bucket_name.clone(), + config.bucket_root.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + config.parquet_upload_interval_in_secs(), + ); + + Self { + connection_pool, + v2_token_datas_sender, + v2_token_ownerships_sender, + } + } +} + +impl Debug for ParquetTokenV2Processor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let state = &self.connection_pool.state(); + write!( + f, + "TokenV2TransactionProcessor {{ connections: {:?} idle_connections: {:?} }}", + state.connections, state.idle_connections + ) + } +} + +#[async_trait] +impl ProcessorTrait for ParquetTokenV2Processor { + fn name(&self) -> &'static str { + ProcessorName::ParquetTokenV2Processor.into() + } + + async fn process_transactions( + &self, + transactions: Vec, + start_version: u64, + end_version: u64, + _: Option, + ) -> anyhow::Result { + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); + + let table_handle_to_owner = + TableMetadataForToken::get_table_handle_to_owner_from_transactions(&transactions); + + let (token_datas_v2, token_ownerships_v2) = parse_v2_token( + &transactions, + &table_handle_to_owner, + &mut transaction_version_to_struct_count, + ) + .await; + + let token_data_v2_parquet_data = ParquetDataGeneric { + data: token_datas_v2, + }; + + self.v2_token_datas_sender + .send(token_data_v2_parquet_data) + .await + .context("Failed to send token data v2 parquet data")?; + + let token_ownerships_v2_parquet_data = ParquetDataGeneric { + data: token_ownerships_v2, + }; + + self.v2_token_ownerships_sender + .send(token_ownerships_v2_parquet_data) + .await + .context("Failed to send token ownerships v2 parquet data")?; + + Ok(ProcessingResult::ParquetProcessingResult( + ParquetProcessingResult { + start_version: start_version as i64, + end_version: end_version as i64, + last_transaction_timestamp: last_transaction_timestamp.clone(), + txn_version_to_struct_count: Some(transaction_version_to_struct_count), + parquet_processed_structs: None, + table_name: "".to_string(), + }, + )) + } + + fn connection_pool(&self) -> &ArcDbPool { + &self.connection_pool + } +} + +async fn parse_v2_token( + transactions: &[Transaction], + table_handle_to_owner: &TableHandleToOwner, + transaction_version_to_struct_count: &mut AHashMap, +) -> (Vec, Vec) { + // Token V2 and V1 combined + let mut token_datas_v2 = vec![]; + let mut token_ownerships_v2 = vec![]; + + // Get Metadata for token v2 by object + // We want to persist this through the entire batch so that even if a token is burned, + // we can still get the object core metadata for it + let mut token_v2_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new(); + + // Code above is inefficient (multiple passthroughs) so I'm approaching TokenV2 with a cleaner code structure + for txn in transactions { + let txn_version = txn.version; + let txn_data = match txn.txn_data.as_ref() { + Some(data) => data, + None => { + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["TokenV2Processor"]) + .inc(); + tracing::warn!( + transaction_version = txn_version, + "Transaction data doesn't exist" + ); + continue; + }, + }; + let txn_version = txn.version as i64; + let txn_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); + let transaction_info = txn.info.as_ref().expect("Transaction info doesn't exist!"); + + if let TxnData::User(user_txn) = txn_data { + // Need to do a first pass to get all the objects + for wsc in transaction_info.changes.iter() { + if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { + if let Some(object) = + ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() + { + token_v2_metadata_helper.insert( + standardize_address(&wr.address.to_string()), + ObjectAggregatedData { + object, + ..ObjectAggregatedData::default() + }, + ); + } + } + } + + // Need to do a second pass to get all the structs related to the object + for wsc in transaction_info.changes.iter() { + if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { + let address = standardize_address(&wr.address.to_string()); + if let Some(aggregated_data) = token_v2_metadata_helper.get_mut(&address) { + if let Some(fixed_supply) = + FixedSupply::from_write_resource(wr, txn_version).unwrap() + { + aggregated_data.fixed_supply = Some(fixed_supply); + } + if let Some(unlimited_supply) = + UnlimitedSupply::from_write_resource(wr, txn_version).unwrap() + { + aggregated_data.unlimited_supply = Some(unlimited_supply); + } + if let Some(aptos_collection) = + AptosCollection::from_write_resource(wr, txn_version).unwrap() + { + aggregated_data.aptos_collection = Some(aptos_collection); + } + if let Some(property_map) = + PropertyMapModel::from_write_resource(wr, txn_version).unwrap() + { + aggregated_data.property_map = Some(property_map); + } + if let Some(concurrent_supply) = + ConcurrentSupply::from_write_resource(wr, txn_version).unwrap() + { + aggregated_data.concurrent_supply = Some(concurrent_supply); + } + if let Some(token) = TokenV2::from_write_resource(wr, txn_version).unwrap() + { + aggregated_data.token = Some(token); + } + if let Some(fungible_asset_metadata) = + FungibleAssetMetadata::from_write_resource(wr, txn_version).unwrap() + { + aggregated_data.fungible_asset_metadata = Some(fungible_asset_metadata); + } + if let Some(token_identifier) = + TokenIdentifiers::from_write_resource(wr, txn_version).unwrap() + { + aggregated_data.token_identifier = Some(token_identifier); + } + if let Some(untransferable) = + Untransferable::from_write_resource(wr, txn_version).unwrap() + { + aggregated_data.untransferable = Some(untransferable); + } + } + } + } + + // Pass through events to get the burn events and token activities v2 + // This needs to be here because we need the metadata above for token activities + // and burn / transfer events need to come before the next section + for (index, event) in user_txn.events.iter().enumerate() { + if let Some(transfer_events) = + TransferEvent::from_event(event, txn_version).unwrap() + { + if let Some(aggregated_data) = + token_v2_metadata_helper.get_mut(&transfer_events.get_object_address()) + { + // we don't want index to be 0 otherwise we might have collision with write set change index + // note that these will be multiplied by -1 so that it doesn't conflict with wsc index + let index = if index == 0 { + user_txn.events.len() + } else { + index + }; + aggregated_data + .transfer_events + .push((index as i64, transfer_events)); + } + } + } + + for (index, wsc) in transaction_info.changes.iter().enumerate() { + let wsc_index = index as i64; + match wsc.change.as_ref().unwrap() { + Change::WriteTableItem(table_item) => { + if let Some(token_data) = TokenDataV2::get_v1_from_write_table_item( + table_item, + txn_version, + wsc_index, + txn_timestamp, + ) + .unwrap() + { + token_datas_v2.push(token_data); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + if let Some(token_ownership) = + TokenOwnershipV2::get_v1_from_write_table_item( + table_item, + txn_version, + wsc_index, + txn_timestamp, + table_handle_to_owner, + ) + .unwrap() + { + token_ownerships_v2.push(token_ownership); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + }, + Change::DeleteTableItem(table_item) => { + if let Some(token_ownership) = + TokenOwnershipV2::get_v1_from_delete_table_item( + table_item, + txn_version, + wsc_index, + txn_timestamp, + table_handle_to_owner, + ) + .unwrap() + { + token_ownerships_v2.push(token_ownership); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + }, + Change::WriteResource(resource) => { + if let Some(token_data) = TokenDataV2::get_v2_from_write_resource( + resource, + txn_version, + wsc_index, + txn_timestamp, + &token_v2_metadata_helper, + ) + .unwrap() + { + // Add NFT ownership + let mut ownerships = TokenOwnershipV2::get_nft_v2_from_token_data( + &token_data, + &token_v2_metadata_helper, + ) + .unwrap(); + token_ownerships_v2.append(&mut ownerships); + token_datas_v2.push(token_data); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += ownerships.len() as i64 + 1) + .or_insert(1); + } + }, + _ => {}, + } + } + } + } + + (token_datas_v2, token_ownerships_v2) +} diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 110cfa8e7..94282e50a 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -23,6 +23,7 @@ use crate::{ parquet_default_processor::ParquetDefaultProcessor, parquet_events_processor::ParquetEventsProcessor, parquet_fungible_asset_processor::ParquetFungibleAssetProcessor, + parquet_token_v2_processor::ParquetTokenV2Processor, parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor, }, stake_processor::StakeProcessor, @@ -968,6 +969,13 @@ pub fn build_processor( gap_detector_sender.expect("Parquet processor requires a gap detector sender"), )) }, + ProcessorConfig::ParquetTokenV2Processor(config) => { + Processor::from(ParquetTokenV2Processor::new( + db_pool, + config.clone(), + gap_detector_sender.expect("Parquet processor requires a gap detector sender"), + )) + }, ProcessorConfig::ParquetEventsProcessor(config) => { Processor::from(ParquetEventsProcessor::new( db_pool,