From 312280bdde8596a16241e9ec6e78f8d7391dec3c Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Thu, 25 Jul 2024 11:51:28 -0700 Subject: [PATCH] Add parquet_ans_processor (#455) * Add parquet_ans_processor * add missing model file * add processor file * lint * fix build * rebase and lint * temp * rebase * fix processor name --- .../src/db/common/models/ans_models/mod.rs | 3 + .../ans_models/parquet_ans_lookup_v2.rs | 172 ++++++++++++ rust/processor/src/processors/mod.rs | 4 + .../src/processors/parquet_processors/mod.rs | 2 +- .../parquet_ans_processor.rs | 262 ++++++++++++++++++ rust/processor/src/worker.rs | 6 + 6 files changed, 448 insertions(+), 1 deletion(-) create mode 100644 rust/processor/src/db/common/models/ans_models/parquet_ans_lookup_v2.rs create mode 100644 rust/processor/src/processors/parquet_processors/parquet_ans_processor.rs diff --git a/rust/processor/src/db/common/models/ans_models/mod.rs b/rust/processor/src/db/common/models/ans_models/mod.rs index 5b82f289c..2ad5dcf6f 100644 --- a/rust/processor/src/db/common/models/ans_models/mod.rs +++ b/rust/processor/src/db/common/models/ans_models/mod.rs @@ -4,3 +4,6 @@ pub mod ans_lookup; pub mod ans_lookup_v2; pub mod ans_utils; + +// parquet models +pub mod parquet_ans_lookup_v2; diff --git a/rust/processor/src/db/common/models/ans_models/parquet_ans_lookup_v2.rs b/rust/processor/src/db/common/models/ans_models/parquet_ans_lookup_v2.rs new file mode 100644 index 000000000..1ed0ec49c --- /dev/null +++ b/rust/processor/src/db/common/models/ans_models/parquet_ans_lookup_v2.rs @@ -0,0 +1,172 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::{ + ans_models::{ + ans_lookup::{AnsPrimaryName, CurrentAnsPrimaryName}, + ans_utils::SetReverseLookupEvent, + }, + token_v2_models::v2_token_utils::TokenStandard, + }, +}; +use allocative_derive::Allocative; +use aptos_protos::transaction::v1::Event; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Default, Debug, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct AnsPrimaryNameV2 { + pub txn_version: i64, + pub write_set_change_index: i64, + pub registered_address: String, + pub token_standard: String, + pub domain: Option, + pub subdomain: Option, + pub token_name: Option, + pub is_deleted: bool, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for AnsPrimaryNameV2 { + const TABLE_NAME: &'static str = "ans_primary_name_v2"; +} + +impl HasVersion for AnsPrimaryNameV2 { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for AnsPrimaryNameV2 { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct CurrentAnsPrimaryNameV2 { + pub registered_address: String, + pub token_standard: String, + pub domain: Option, + pub subdomain: Option, + pub token_name: Option, + pub is_deleted: bool, + pub last_transaction_version: i64, +} + +impl Ord for CurrentAnsPrimaryNameV2 { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.registered_address.cmp(&other.registered_address) + } +} + +impl PartialOrd for CurrentAnsPrimaryNameV2 { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl CurrentAnsPrimaryNameV2 { + pub fn get_v2_from_v1( + v1_current_primary_name: CurrentAnsPrimaryName, + v1_primary_name: AnsPrimaryName, + block_timestamp: chrono::NaiveDateTime, + ) -> (Self, AnsPrimaryNameV2) { + ( + Self { + registered_address: v1_current_primary_name.registered_address, + token_standard: TokenStandard::V1.to_string(), + domain: v1_current_primary_name.domain, + subdomain: v1_current_primary_name.subdomain, + token_name: v1_current_primary_name.token_name, + is_deleted: v1_current_primary_name.is_deleted, + last_transaction_version: v1_current_primary_name.last_transaction_version, + }, + AnsPrimaryNameV2 { + txn_version: v1_primary_name.transaction_version, + write_set_change_index: v1_primary_name.write_set_change_index, + registered_address: v1_primary_name.registered_address, + token_standard: TokenStandard::V1.to_string(), + domain: v1_primary_name.domain, + subdomain: v1_primary_name.subdomain, + token_name: v1_primary_name.token_name, + is_deleted: v1_primary_name.is_deleted, + block_timestamp, + }, + ) + } + + // Parse v2 primary name record from SetReverseLookupEvent + pub fn parse_v2_primary_name_record_from_event( + event: &Event, + txn_version: i64, + event_index: i64, + ans_v2_contract_address: &str, + block_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + if let Some(set_reverse_lookup_event) = + SetReverseLookupEvent::from_event(event, ans_v2_contract_address, txn_version).unwrap() + { + if set_reverse_lookup_event.get_curr_domain_trunc().is_empty() { + // Handle case where the address's primary name is unset + return Ok(Some(( + Self { + registered_address: set_reverse_lookup_event.get_account_addr().clone(), + token_standard: TokenStandard::V2.to_string(), + domain: None, + subdomain: None, + token_name: None, + last_transaction_version: txn_version, + is_deleted: true, + }, + AnsPrimaryNameV2 { + txn_version, + write_set_change_index: -(event_index + 1), + registered_address: set_reverse_lookup_event.get_account_addr().clone(), + token_standard: TokenStandard::V2.to_string(), + domain: None, + subdomain: None, + token_name: None, + is_deleted: true, + block_timestamp, + }, + ))); + } else { + // Handle case where the address is set to a new primary name + return Ok(Some(( + Self { + registered_address: set_reverse_lookup_event.get_account_addr().clone(), + token_standard: TokenStandard::V2.to_string(), + domain: Some(set_reverse_lookup_event.get_curr_domain_trunc()), + subdomain: Some(set_reverse_lookup_event.get_curr_subdomain_trunc()), + token_name: Some(set_reverse_lookup_event.get_curr_token_name()), + last_transaction_version: txn_version, + is_deleted: false, + }, + AnsPrimaryNameV2 { + txn_version, + write_set_change_index: -(event_index + 1), + registered_address: set_reverse_lookup_event.get_account_addr().clone(), + token_standard: TokenStandard::V2.to_string(), + domain: Some(set_reverse_lookup_event.get_curr_domain_trunc()), + subdomain: Some(set_reverse_lookup_event.get_curr_subdomain_trunc()), + token_name: Some(set_reverse_lookup_event.get_curr_token_name()), + is_deleted: false, + block_timestamp, + }, + ))); + } + } + Ok(None) + } +} diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index aade7b7d2..6260e46ca 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -36,6 +36,7 @@ use crate::{ db::common::models::processor_status::ProcessorStatus, gap_detectors::ProcessingResult, processors::parquet_processors::{ + parquet_ans_processor::{ParquetAnsProcessor, ParquetAnsProcessorConfig}, parquet_default_processor::{ParquetDefaultProcessor, ParquetDefaultProcessorConfig}, parquet_fungible_asset_processor::{ ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig, @@ -198,6 +199,7 @@ pub enum ProcessorConfig { ParquetDefaultProcessor(ParquetDefaultProcessorConfig), ParquetFungibleAssetProcessor(ParquetFungibleAssetProcessorConfig), ParquetTransactionMetadataProcessor(ParquetTransactionMetadataProcessorConfig), + ParquetAnsProcessor(ParquetAnsProcessorConfig), } impl ProcessorConfig { @@ -213,6 +215,7 @@ impl ProcessorConfig { ProcessorConfig::ParquetDefaultProcessor(_) | ProcessorConfig::ParquetFungibleAssetProcessor(_) | ProcessorConfig::ParquetTransactionMetadataProcessor(_) + | ProcessorConfig::ParquetAnsProcessor(_) ) } } @@ -250,6 +253,7 @@ pub enum Processor { ParquetDefaultProcessor, ParquetFungibleAssetProcessor, ParquetTransactionMetadataProcessor, + ParquetAnsProcessor, } #[cfg(test)] diff --git a/rust/processor/src/processors/parquet_processors/mod.rs b/rust/processor/src/processors/parquet_processors/mod.rs index 5700910df..5720fac02 100644 --- a/rust/processor/src/processors/parquet_processors/mod.rs +++ b/rust/processor/src/processors/parquet_processors/mod.rs @@ -1,7 +1,7 @@ use std::time::Duration; +pub mod parquet_ans_processor; pub mod parquet_default_processor; - pub mod parquet_fungible_asset_processor; pub mod parquet_transaction_metadata_processor; diff --git a/rust/processor/src/processors/parquet_processors/parquet_ans_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_ans_processor.rs new file mode 100644 index 000000000..66d48ce94 --- /dev/null +++ b/rust/processor/src/processors/parquet_processors/parquet_ans_processor.rs @@ -0,0 +1,262 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS}; +use crate::{ + bq_analytics::{ + create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric, + ParquetProcessingResult, + }, + db::common::models::ans_models::{ + ans_lookup::CurrentAnsPrimaryName, + parquet_ans_lookup_v2::{AnsPrimaryNameV2, CurrentAnsPrimaryNameV2}, + }, + gap_detectors::ProcessingResult, + processors::{ProcessorName, ProcessorTrait}, + utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, database::ArcDbPool}, +}; +use ahash::AHashMap; +use anyhow::anyhow; +use aptos_protos::transaction::v1::{ + transaction::TxnData, write_set_change::Change as WriteSetChange, Transaction, +}; +use async_trait::async_trait; +use kanal::AsyncSender; +use serde::{Deserialize, Serialize}; +use std::{fmt::Debug, time::Duration}; +use tracing::error; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ParquetAnsProcessorConfig { + pub google_application_credentials: Option, + pub bucket_name: String, + pub bucket_root: String, + pub parquet_handler_response_channel_size: usize, + pub max_buffer_size: usize, + pub ans_v1_primary_names_table_handle: String, + pub ans_v1_name_records_table_handle: String, + pub ans_v2_contract_address: String, + pub parquet_upload_interval: u64, +} + +impl UploadIntervalConfig for ParquetAnsProcessorConfig { + fn parquet_upload_interval_in_secs(&self) -> Duration { + Duration::from_secs(self.parquet_upload_interval) + } +} + +pub struct ParquetAnsProcessor { + connection_pool: ArcDbPool, + config: ParquetAnsProcessorConfig, + ans_primary_name_v2_sender: AsyncSender>, +} + +impl ParquetAnsProcessor { + pub fn new( + connection_pool: ArcDbPool, + config: ParquetAnsProcessorConfig, + new_gap_detector_sender: AsyncSender, + ) -> Self { + if let Some(credentials) = config.google_application_credentials.clone() { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let ans_primary_name_v2_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::ParquetAnsProcessor.into(), + config.bucket_name.clone(), + config.bucket_root.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + config.parquet_upload_interval_in_secs(), + ); + + Self { + connection_pool, + config, + ans_primary_name_v2_sender, + } + } +} + +impl Debug for ParquetAnsProcessor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ParquetAnsProcessor {{ capacity of trnasactions channel: {:?}}}", + &self.ans_primary_name_v2_sender.capacity() + ) + } +} + +#[async_trait] +impl ProcessorTrait for ParquetAnsProcessor { + fn name(&self) -> &'static str { + ProcessorName::ParquetAnsProcessor.into() + } + + async fn process_transactions( + &self, + transactions: Vec, + start_version: u64, + end_version: u64, + _db_chain_id: Option, + ) -> anyhow::Result { + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); + + let all_ans_primary_names_v2 = parse_ans( + &transactions, + &mut transaction_version_to_struct_count, + self.config.ans_v1_primary_names_table_handle.clone(), + self.config.ans_v2_contract_address.clone(), + ); + + let ans_primary_name_v2_parquet_data = ParquetDataGeneric { + data: all_ans_primary_names_v2, + }; + + self.ans_primary_name_v2_sender + .send(ans_primary_name_v2_parquet_data) + .await + .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + + Ok(ProcessingResult::ParquetProcessingResult( + ParquetProcessingResult { + start_version: start_version as i64, + end_version: end_version as i64, + last_transaction_timestamp: last_transaction_timestamp.clone(), + txn_version_to_struct_count: Some(transaction_version_to_struct_count), + parquet_processed_structs: None, + table_name: "".to_string(), + }, + )) + } + + fn connection_pool(&self) -> &ArcDbPool { + &self.connection_pool + } +} + +fn parse_ans( + transactions: &[Transaction], + transaction_version_to_struct_count: &mut AHashMap, + ans_v1_primary_names_table_handle: String, + ans_v2_contract_address: String, +) -> Vec { + let mut all_ans_primary_names_v2 = vec![]; + + for transaction in transactions { + let txn_version = transaction.version as i64; + let txn_data = match transaction.txn_data.as_ref() { + Some(data) => data, + None => { + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["AnsProcessor"]) + .inc(); + tracing::warn!( + transaction_version = txn_version, + "Transaction data doesn't exist", + ); + continue; + }, + }; + let transaction_info = transaction + .info + .as_ref() + .expect("Transaction info doesn't exist!"); + let timestamp = transaction + .timestamp + .as_ref() + .expect("Transaction timestamp doesn't exist!"); + #[allow(deprecated)] + let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0) + .expect("Txn Timestamp is invalid!"); + // Extracts from user transactions. Other transactions won't have any ANS changes + if let TxnData::User(user_txn) = txn_data { + // Parse V2 ANS Events. We only care about the following events: + // 1. RenewNameEvents: helps to fill in metadata for name records with updated expiration time + // 2. SetReverseLookupEvents: parse to get current_ans_primary_names + for (event_index, event) in user_txn.events.iter().enumerate() { + if let Some((_, ans_lookup_v2)) = + CurrentAnsPrimaryNameV2::parse_v2_primary_name_record_from_event( + event, + txn_version, + event_index as i64, + &ans_v2_contract_address, + block_timestamp, + ) + .unwrap() + { + all_ans_primary_names_v2.push(ans_lookup_v2); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + } + + // Parse V1 ANS write set changes + for (wsc_index, wsc) in transaction_info.changes.iter().enumerate() { + match wsc.change.as_ref().unwrap() { + WriteSetChange::WriteTableItem(table_item) => { + if let Some((current_primary_name, primary_name)) = + CurrentAnsPrimaryName::parse_primary_name_record_from_write_table_item_v1( + table_item, + &ans_v1_primary_names_table_handle, + txn_version, + wsc_index as i64, + ) + .unwrap_or_else(|e| { + error!( + error = ?e, + "Error parsing ANS v1 primary name from write table item" + ); + panic!(); + }) + { + // Include all v1 primary names in v2 data + let (_, primary_name_v2) = + CurrentAnsPrimaryNameV2::get_v2_from_v1(current_primary_name.clone(), primary_name.clone(), block_timestamp); + all_ans_primary_names_v2.push(primary_name_v2); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + }, + WriteSetChange::DeleteTableItem(table_item) => { + if let Some((current_primary_name, primary_name)) = + CurrentAnsPrimaryName::parse_primary_name_record_from_delete_table_item_v1( + table_item, + &ans_v1_primary_names_table_handle, + txn_version, + wsc_index as i64, + ) + .unwrap_or_else(|e| { + error!( + error = ?e, + "Error parsing ANS v1 primary name from delete table item" + ); + panic!(); + }) + { + // Include all v1 primary names in v2 data + let (_, primary_name_v2) = + CurrentAnsPrimaryNameV2::get_v2_from_v1(current_primary_name, primary_name, block_timestamp); + all_ans_primary_names_v2.push(primary_name_v2); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + }, + _ => continue, + } + } + } + } + + all_ans_primary_names_v2 +} diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index df26c8740..11625b44e 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -19,6 +19,7 @@ use crate::{ nft_metadata_processor::NftMetadataProcessor, objects_processor::ObjectsProcessor, parquet_processors::{ + parquet_ans_processor::ParquetAnsProcessor, parquet_default_processor::ParquetDefaultProcessor, parquet_fungible_asset_processor::ParquetFungibleAssetProcessor, parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor, @@ -967,5 +968,10 @@ pub fn build_processor( gap_detector_sender.expect("Parquet processor requires a gap detector sender"), )) }, + ProcessorConfig::ParquetAnsProcessor(config) => Processor::from(ParquetAnsProcessor::new( + db_pool, + config.clone(), + gap_detector_sender.expect("Parquet processor requires a gap detector sender"), + )), } }