Skip to content

Commit

Permalink
Yuunlimm/parquet fa migrate (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm authored Sep 16, 2024
1 parent 31220f6 commit 84c01c1
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,23 @@ pub type EventToCoinType = AHashMap<EventGuidResource, CoinType>;
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct FungibleAssetActivity {
pub transaction_version: i64,
pub txn_version: i64,
pub event_index: i64,
pub owner_address: Option<String>,
pub storage_id: String,
pub asset_type: Option<String>,
pub is_frozen: Option<bool>,
pub amount: Option<String>, // it is a string representation of the u128
pub type_: String,
pub event_type: String,
pub is_gas_fee: bool,
pub gas_fee_payer_address: Option<String>,
pub is_transaction_success: bool,
pub entry_function_id_str: Option<String>,
pub block_height: i64,
pub token_standard: String,
#[allocative(skip)]
pub transaction_timestamp: chrono::NaiveDateTime,
pub storage_refund_amount: u64,
pub block_timestamp: chrono::NaiveDateTime,
pub storage_refund_octa: u64,
}

impl NamedTable for FungibleAssetActivity {
Expand All @@ -68,13 +68,13 @@ impl NamedTable for FungibleAssetActivity {

impl HasVersion for FungibleAssetActivity {
fn version(&self) -> i64 {
self.transaction_version
self.txn_version
}
}

impl GetTimeStamp for FungibleAssetActivity {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.transaction_timestamp
self.block_timestamp
}
}

Expand Down Expand Up @@ -136,22 +136,22 @@ impl FungibleAssetActivity {
.map(|fa| fa.metadata.get_reference_address());

return Ok(Some(Self {
transaction_version: txn_version,
txn_version,
event_index,
owner_address: maybe_owner_address,
storage_id: storage_id.clone(),
asset_type: maybe_asset_type,
is_frozen,
amount,
type_: event_type.clone(),
event_type: event_type.clone(),
is_gas_fee: false,
gas_fee_payer_address: None,
is_transaction_success: true,
entry_function_id_str: entry_function_id_str.clone(),
block_height,
token_standard: TokenStandard::V2.to_string(),
transaction_timestamp: txn_timestamp,
storage_refund_amount: 0,
block_timestamp: txn_timestamp,
storage_refund_octa: 0,
}));
}
Ok(None)
Expand All @@ -161,7 +161,7 @@ impl FungibleAssetActivity {
event: &Event,
txn_version: i64,
block_height: i64,
transaction_timestamp: chrono::NaiveDateTime,
block_timestamp: chrono::NaiveDateTime,
entry_function_id_str: &Option<String>,
event_to_coin_type: &EventToCoinType,
event_index: i64,
Expand Down Expand Up @@ -207,22 +207,22 @@ impl FungibleAssetActivity {
CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str());

Ok(Some(Self {
transaction_version: txn_version,
txn_version,
event_index,
owner_address: Some(owner_address),
storage_id,
asset_type: Some(coin_type),
is_frozen: None,
amount: Some(amount),
type_: event.type_str.clone(),
event_type: event.type_str.clone(),
is_gas_fee: false,
gas_fee_payer_address: None,
is_transaction_success: true,
entry_function_id_str: entry_function_id_str.clone(),
block_height,
token_standard: TokenStandard::V1.to_string(),
transaction_timestamp,
storage_refund_amount: 0,
block_timestamp,
storage_refund_octa: 0,
}))
} else {
Ok(None)
Expand All @@ -235,17 +235,17 @@ impl FungibleAssetActivity {
txn_info: &TransactionInfo,
user_transaction_request: &UserTransactionRequest,
entry_function_id_str: &Option<String>,
transaction_version: i64,
transaction_timestamp: chrono::NaiveDateTime,
txn_version: i64,
block_timestamp: chrono::NaiveDateTime,
block_height: i64,
fee_statement: Option<FeeStatement>,
) -> Self {
let v1_activity = CoinActivity::get_gas_event(
txn_info,
user_transaction_request,
entry_function_id_str,
transaction_version,
transaction_timestamp,
txn_version,
block_timestamp,
block_height,
fee_statement,
);
Expand All @@ -254,22 +254,22 @@ impl FungibleAssetActivity {
v1_activity.owner_address.as_str(),
);
Self {
transaction_version,
txn_version,
event_index: v1_activity.event_index.unwrap(),
owner_address: Some(v1_activity.owner_address),
storage_id,
asset_type: Some(v1_activity.coin_type),
is_frozen: None,
amount: Some(v1_activity.amount.to_string()),
type_: v1_activity.activity_type,
event_type: v1_activity.activity_type,
is_gas_fee: v1_activity.is_gas_fee,
gas_fee_payer_address: v1_activity.gas_fee_payer_address,
is_transaction_success: v1_activity.is_transaction_success,
entry_function_id_str: v1_activity.entry_function_id_str,
block_height,
token_standard: TokenStandard::V1.to_string(),
transaction_timestamp,
storage_refund_amount: bigdecimal_to_u64(&v1_activity.storage_refund_amount),
block_timestamp,
storage_refund_octa: bigdecimal_to_u64(&v1_activity.storage_refund_amount),
}
}
}
6 changes: 5 additions & 1 deletion rust/processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ use crate::{
parquet_ans_processor::{ParquetAnsProcessor, ParquetAnsProcessorConfig},
parquet_default_processor::{ParquetDefaultProcessor, ParquetDefaultProcessorConfig},
parquet_events_processor::{ParquetEventsProcessor, ParquetEventsProcessorConfig},
parquet_fungible_asset_activities_processor::{
ParquetFungibleAssetActivitiesProcessor, ParquetFungibleAssetActivitiesProcessorConfig,
},
parquet_fungible_asset_processor::{
ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig,
},
Expand All @@ -58,7 +61,6 @@ use aptos_protos::transaction::v1::Transaction as ProtoTransaction;
use async_trait::async_trait;
use diesel::{pg::upsert::excluded, ExpressionMethods};
use enum_dispatch::enum_dispatch;
use parquet_processors::parquet_fungible_asset_activities_processor::ParquetFungibleAssetActivitiesProcessorConfig;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

Expand Down Expand Up @@ -226,6 +228,7 @@ impl ProcessorConfig {
| ProcessorConfig::ParquetAnsProcessor(_)
| ProcessorConfig::ParquetEventsProcessor(_)
| ProcessorConfig::ParquetTokenV2Processor(_)
| ProcessorConfig::ParquetFungibleAssetActivitiesProcessor(_)
)
}
}
Expand Down Expand Up @@ -265,6 +268,7 @@ pub enum Processor {
// Parquet processors
ParquetDefaultProcessor,
ParquetFungibleAssetProcessor,
ParquetFungibleAssetActivitiesProcessor,
ParquetTransactionMetadataProcessor,
ParquetAnsProcessor,
ParquetEventsProcessor,
Expand Down
8 changes: 8 additions & 0 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
parquet_ans_processor::ParquetAnsProcessor,
parquet_default_processor::ParquetDefaultProcessor,
parquet_events_processor::ParquetEventsProcessor,
parquet_fungible_asset_activities_processor::ParquetFungibleAssetActivitiesProcessor,
parquet_fungible_asset_processor::ParquetFungibleAssetProcessor,
parquet_token_v2_processor::ParquetTokenV2Processor,
parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor,
Expand Down Expand Up @@ -1004,5 +1005,12 @@ pub fn build_processor(
config.clone(),
gap_detector_sender.expect("Parquet processor requires a gap detector sender"),
)),
ProcessorConfig::ParquetFungibleAssetActivitiesProcessor(config) => {
Processor::from(ParquetFungibleAssetActivitiesProcessor::new(
db_pool,
config.clone(),
gap_detector_sender.expect("Parquet processor requires a gap detector sender"),
))
},
}
}

0 comments on commit 84c01c1

Please sign in to comment.