diff --git a/rust/processor/src/db/common/models/token_v2_models/mod.rs b/rust/processor/src/db/common/models/token_v2_models/mod.rs index 642249582..9046023e2 100644 --- a/rust/processor/src/db/common/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/common/models/token_v2_models/mod.rs @@ -1,3 +1,4 @@ pub mod raw_token_claims; pub mod raw_v1_token_royalty; +pub mod raw_v2_token_activities; pub mod raw_v2_token_metadata; diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs new file mode 100644 index 000000000..58670c972 --- /dev/null +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs @@ -0,0 +1,343 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + db::postgres::models::{ + object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_models::token_utils::{TokenDataIdType, TokenEvent}, + token_v2_models::v2_token_utils::{TokenStandard, V2TokenEvent}, + }, + utils::util::standardize_address, +}; +use aptos_protos::transaction::v1::Event; +use bigdecimal::{BigDecimal, One, Zero}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RawTokenActivityV2 { + pub transaction_version: i64, + pub event_index: i64, + pub event_account_address: String, + pub token_data_id: String, + pub property_version_v1: BigDecimal, + pub type_: String, + pub from_address: Option, + pub to_address: Option, + pub token_amount: BigDecimal, + pub before_value: Option, + pub after_value: Option, + pub entry_function_id_str: Option, + pub token_standard: String, + pub is_fungible_v2: Option, + pub transaction_timestamp: chrono::NaiveDateTime, +} + +/// A simplified TokenActivity (excluded common fields) to reduce code duplication +struct TokenActivityHelperV1 { + pub token_data_id_struct: TokenDataIdType, + pub property_version: BigDecimal, + pub from_address: Option, + pub to_address: Option, + pub token_amount: BigDecimal, +} + +/// A simplified TokenActivity (excluded common fields) to reduce code duplication +struct TokenActivityHelperV2 { + pub from_address: Option, + pub to_address: Option, + pub token_amount: BigDecimal, + pub before_value: Option, + pub after_value: Option, + pub event_type: String, +} + +impl RawTokenActivityV2 { + pub async fn get_nft_v2_from_parsed_event( + event: &Event, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + event_index: i64, + entry_function_id_str: &Option, + token_v2_metadata: &ObjectAggregatedDataMapping, + ) -> anyhow::Result> { + let event_type = event.type_str.clone(); + if let Some(token_event) = + &V2TokenEvent::from_event(&event_type, event.data.as_str(), txn_version)? + { + let event_account_address = + standardize_address(&event.key.as_ref().unwrap().account_address); + // burn and mint events are attached to the collection. The rest should be attached to the token + let token_data_id = match token_event { + V2TokenEvent::MintEvent(inner) => inner.get_token_address(), + V2TokenEvent::Mint(inner) => inner.get_token_address(), + V2TokenEvent::BurnEvent(inner) => inner.get_token_address(), + V2TokenEvent::Burn(inner) => inner.get_token_address(), + V2TokenEvent::TransferEvent(inner) => inner.get_object_address(), + _ => event_account_address.clone(), + }; + + if let Some(metadata) = token_v2_metadata.get(&token_data_id) { + let object_core = &metadata.object.object_core; + let token_activity_helper = match token_event { + V2TokenEvent::MintEvent(_) => TokenActivityHelperV2 { + from_address: Some(object_core.get_owner_address()), + to_address: None, + token_amount: BigDecimal::one(), + before_value: None, + after_value: None, + event_type: event_type.clone(), + }, + V2TokenEvent::Mint(_) => TokenActivityHelperV2 { + from_address: Some(object_core.get_owner_address()), + to_address: None, + token_amount: BigDecimal::one(), + before_value: None, + after_value: None, + event_type: "0x4::collection::MintEvent".to_string(), + }, + V2TokenEvent::TokenMutationEvent(inner) => TokenActivityHelperV2 { + from_address: Some(object_core.get_owner_address()), + to_address: None, + token_amount: BigDecimal::zero(), + before_value: Some(inner.old_value.clone()), + after_value: Some(inner.new_value.clone()), + event_type: event_type.clone(), + }, + V2TokenEvent::TokenMutation(inner) => TokenActivityHelperV2 { + from_address: Some(inner.token_address.clone()), + to_address: None, + token_amount: BigDecimal::zero(), + before_value: Some(inner.old_value.clone()), + after_value: Some(inner.new_value.clone()), + event_type: "0x4::collection::MutationEvent".to_string(), + }, + V2TokenEvent::BurnEvent(_) => TokenActivityHelperV2 { + from_address: Some(object_core.get_owner_address()), + to_address: None, + token_amount: BigDecimal::one(), + before_value: None, + after_value: None, + event_type: event_type.clone(), + }, + V2TokenEvent::Burn(_) => TokenActivityHelperV2 { + from_address: Some(object_core.get_owner_address()), + to_address: None, + token_amount: BigDecimal::one(), + before_value: None, + after_value: None, + event_type: "0x4::collection::BurnEvent".to_string(), + }, + V2TokenEvent::TransferEvent(inner) => TokenActivityHelperV2 { + from_address: Some(inner.get_from_address()), + to_address: Some(inner.get_to_address()), + token_amount: BigDecimal::one(), + before_value: None, + after_value: None, + event_type: event_type.clone(), + }, + }; + return Ok(Some(Self { + transaction_version: txn_version, + event_index, + event_account_address, + token_data_id, + property_version_v1: BigDecimal::zero(), + type_: token_activity_helper.event_type, + from_address: token_activity_helper.from_address, + to_address: token_activity_helper.to_address, + token_amount: token_activity_helper.token_amount, + before_value: token_activity_helper.before_value, + after_value: token_activity_helper.after_value, + entry_function_id_str: entry_function_id_str.clone(), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: None, + transaction_timestamp: txn_timestamp, + })); + } else { + // If the object metadata isn't found in the transaction, then the token was burnt. + + // the new burn event has owner address now! + let owner_address = if let V2TokenEvent::Burn(inner) = token_event { + inner.get_previous_owner_address() + } else { + // To handle a case with the old burn events, when a token is minted and burnt in the same transaction + None + }; + + return Ok(Some(Self { + transaction_version: txn_version, + event_index, + event_account_address, + token_data_id, + property_version_v1: BigDecimal::zero(), + type_: event_type, + from_address: owner_address.clone(), + to_address: None, + token_amount: BigDecimal::one(), + before_value: None, + after_value: None, + entry_function_id_str: entry_function_id_str.clone(), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: None, + transaction_timestamp: txn_timestamp, + })); + } + } + Ok(None) + } + + pub fn get_v1_from_parsed_event( + event: &Event, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + event_index: i64, + entry_function_id_str: &Option, + ) -> anyhow::Result> { + let event_type = event.type_str.clone(); + if let Some(token_event) = &TokenEvent::from_event(&event_type, &event.data, txn_version)? { + let event_account_address = + standardize_address(&event.key.as_ref().unwrap().account_address); + let token_activity_helper = match token_event { + TokenEvent::MintTokenEvent(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.id.clone(), + property_version: BigDecimal::zero(), + from_address: Some(event_account_address.clone()), + to_address: None, + token_amount: inner.amount.clone(), + }, + TokenEvent::Mint(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.id.clone(), + property_version: BigDecimal::zero(), + from_address: Some(inner.get_account()), + to_address: None, + token_amount: inner.amount.clone(), + }, + TokenEvent::BurnTokenEvent(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.id.token_data_id.clone(), + property_version: inner.id.property_version.clone(), + from_address: Some(event_account_address.clone()), + to_address: None, + token_amount: inner.amount.clone(), + }, + TokenEvent::Burn(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.id.token_data_id.clone(), + property_version: inner.id.property_version.clone(), + from_address: Some(inner.get_account()), + to_address: None, + token_amount: inner.amount.clone(), + }, + TokenEvent::MutateTokenPropertyMapEvent(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.new_id.token_data_id.clone(), + property_version: inner.new_id.property_version.clone(), + from_address: Some(event_account_address.clone()), + to_address: None, + token_amount: BigDecimal::zero(), + }, + TokenEvent::MutatePropertyMap(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.new_id.token_data_id.clone(), + property_version: inner.new_id.property_version.clone(), + from_address: Some(inner.get_account()), + to_address: None, + token_amount: BigDecimal::zero(), + }, + TokenEvent::WithdrawTokenEvent(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.id.token_data_id.clone(), + property_version: inner.id.property_version.clone(), + from_address: Some(event_account_address.clone()), + to_address: None, + token_amount: inner.amount.clone(), + }, + TokenEvent::TokenWithdraw(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.id.token_data_id.clone(), + property_version: inner.id.property_version.clone(), + from_address: Some(inner.get_account()), + to_address: None, + token_amount: inner.amount.clone(), + }, + TokenEvent::DepositTokenEvent(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.id.token_data_id.clone(), + property_version: inner.id.property_version.clone(), + from_address: None, + to_address: Some(standardize_address(&event_account_address)), + token_amount: inner.amount.clone(), + }, + TokenEvent::TokenDeposit(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.id.token_data_id.clone(), + property_version: inner.id.property_version.clone(), + from_address: None, + to_address: Some(inner.get_account()), + token_amount: inner.amount.clone(), + }, + TokenEvent::OfferTokenEvent(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.token_id.token_data_id.clone(), + property_version: inner.token_id.property_version.clone(), + from_address: Some(event_account_address.clone()), + to_address: Some(inner.get_to_address()), + token_amount: inner.amount.clone(), + }, + TokenEvent::CancelTokenOfferEvent(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.token_id.token_data_id.clone(), + property_version: inner.token_id.property_version.clone(), + from_address: Some(event_account_address.clone()), + to_address: Some(inner.get_to_address()), + token_amount: inner.amount.clone(), + }, + TokenEvent::ClaimTokenEvent(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.token_id.token_data_id.clone(), + property_version: inner.token_id.property_version.clone(), + from_address: Some(event_account_address.clone()), + to_address: Some(inner.get_to_address()), + token_amount: inner.amount.clone(), + }, + TokenEvent::Offer(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.token_id.token_data_id.clone(), + property_version: inner.token_id.property_version.clone(), + from_address: Some(inner.get_from_address()), + to_address: Some(inner.get_to_address()), + token_amount: inner.amount.clone(), + }, + TokenEvent::CancelOffer(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.token_id.token_data_id.clone(), + property_version: inner.token_id.property_version.clone(), + from_address: Some(inner.get_from_address()), + to_address: Some(inner.get_to_address()), + token_amount: inner.amount.clone(), + }, + TokenEvent::Claim(inner) => TokenActivityHelperV1 { + token_data_id_struct: inner.token_id.token_data_id.clone(), + property_version: inner.token_id.property_version.clone(), + from_address: Some(inner.get_from_address()), + to_address: Some(inner.get_to_address()), + token_amount: inner.amount.clone(), + }, + }; + let token_data_id_struct = token_activity_helper.token_data_id_struct; + return Ok(Some(Self { + transaction_version: txn_version, + event_index, + event_account_address, + token_data_id: token_data_id_struct.to_id(), + property_version_v1: token_activity_helper.property_version, + type_: event_type, + from_address: token_activity_helper.from_address, + to_address: token_activity_helper.to_address, + token_amount: token_activity_helper.token_amount, + before_value: None, + after_value: None, + entry_function_id_str: entry_function_id_str.clone(), + token_standard: TokenStandard::V1.to_string(), + is_fungible_v2: None, + transaction_timestamp: txn_timestamp, + })); + } + Ok(None) + } +} + +pub trait TokenActivityV2Convertible { + fn from_raw(raw_item: RawTokenActivityV2) -> Self; +} diff --git a/rust/processor/src/db/parquet/models/token_v2_models/mod.rs b/rust/processor/src/db/parquet/models/token_v2_models/mod.rs index 947fabf71..0d1a81185 100644 --- a/rust/processor/src/db/parquet/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/parquet/models/token_v2_models/mod.rs @@ -1,3 +1,6 @@ pub mod token_claims; pub mod v1_token_royalty; +pub mod v2_token_activities; pub mod v2_token_metadata; + +pub mod v2_token_ownerships; diff --git a/rust/processor/src/db/parquet/models/token_v2_models/v2_token_activities.rs b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_activities.rs new file mode 100644 index 000000000..36b6df2fb --- /dev/null +++ b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_activities.rs @@ -0,0 +1,79 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::token_v2_models::raw_v2_token_activities::{ + RawTokenActivityV2, TokenActivityV2Convertible, + }, +}; +use allocative_derive::Allocative; +use bigdecimal::ToPrimitive; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct TokenActivityV2 { + pub txn_version: i64, + pub event_index: i64, + pub event_account_address: String, + pub token_data_id: String, + pub property_version_v1: u64, // BigDecimal + pub type_: String, + pub from_address: Option, + pub to_address: Option, + pub token_amount: String, // BigDecimal + pub before_value: Option, + pub after_value: Option, + pub entry_function_id_str: Option, + pub token_standard: String, + pub is_fungible_v2: Option, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for TokenActivityV2 { + const TABLE_NAME: &'static str = "token_activities_v2"; +} + +impl HasVersion for TokenActivityV2 { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for TokenActivityV2 { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl TokenActivityV2Convertible for TokenActivityV2 { + // TODO: consider returning a Result + fn from_raw(raw_item: RawTokenActivityV2) -> Self { + Self { + txn_version: raw_item.transaction_version, + event_index: raw_item.event_index, + event_account_address: raw_item.event_account_address, + token_data_id: raw_item.token_data_id, + property_version_v1: raw_item.property_version_v1.to_u64().unwrap(), + type_: raw_item.type_, + from_address: raw_item.from_address, + to_address: raw_item.to_address, + token_amount: raw_item.token_amount.to_string(), + before_value: raw_item.before_value, + after_value: raw_item.after_value, + entry_function_id_str: raw_item.entry_function_id_str, + token_standard: raw_item.token_standard, + is_fungible_v2: raw_item.is_fungible_v2, + block_timestamp: raw_item.transaction_timestamp, + } + } +} diff --git a/rust/processor/src/db/postgres/models/token_v2_models/parquet_v2_token_ownerships.rs b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs similarity index 100% rename from rust/processor/src/db/postgres/models/token_v2_models/parquet_v2_token_ownerships.rs rename to rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs diff --git a/rust/processor/src/db/postgres/models/token_v2_models/mod.rs b/rust/processor/src/db/postgres/models/token_v2_models/mod.rs index 3a2aa0567..568250b2d 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/mod.rs @@ -12,4 +12,3 @@ pub mod v2_token_utils; // parquet models // pub mod parquet_v2_collections; // revisit this pub mod parquet_v2_token_datas; -pub mod parquet_v2_token_ownerships; diff --git a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_activities.rs b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_activities.rs index 6121baf07..038c80381 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_activities.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_activities.rs @@ -5,17 +5,13 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::v2_token_utils::{TokenStandard, V2TokenEvent}; use crate::{ - db::postgres::models::{ - object_models::v2_object_utils::ObjectAggregatedDataMapping, - token_models::token_utils::{TokenDataIdType, TokenEvent}, + db::common::models::token_v2_models::raw_v2_token_activities::{ + RawTokenActivityV2, TokenActivityV2Convertible, }, schema::token_activities_v2, - utils::util::standardize_address, }; -use aptos_protos::transaction::v1::Event; -use bigdecimal::{BigDecimal, One, Zero}; +use bigdecimal::BigDecimal; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -40,304 +36,24 @@ pub struct TokenActivityV2 { pub transaction_timestamp: chrono::NaiveDateTime, } -/// A simplified TokenActivity (excluded common fields) to reduce code duplication -struct TokenActivityHelperV1 { - pub token_data_id_struct: TokenDataIdType, - pub property_version: BigDecimal, - pub from_address: Option, - pub to_address: Option, - pub token_amount: BigDecimal, -} - -/// A simplified TokenActivity (excluded common fields) to reduce code duplication -struct TokenActivityHelperV2 { - pub from_address: Option, - pub to_address: Option, - pub token_amount: BigDecimal, - pub before_value: Option, - pub after_value: Option, - pub event_type: String, -} - -impl TokenActivityV2 { - pub async fn get_nft_v2_from_parsed_event( - event: &Event, - txn_version: i64, - txn_timestamp: chrono::NaiveDateTime, - event_index: i64, - entry_function_id_str: &Option, - token_v2_metadata: &ObjectAggregatedDataMapping, - ) -> anyhow::Result> { - let event_type = event.type_str.clone(); - if let Some(token_event) = - &V2TokenEvent::from_event(&event_type, event.data.as_str(), txn_version)? - { - let event_account_address = - standardize_address(&event.key.as_ref().unwrap().account_address); - // burn and mint events are attached to the collection. The rest should be attached to the token - let token_data_id = match token_event { - V2TokenEvent::MintEvent(inner) => inner.get_token_address(), - V2TokenEvent::Mint(inner) => inner.get_token_address(), - V2TokenEvent::BurnEvent(inner) => inner.get_token_address(), - V2TokenEvent::Burn(inner) => inner.get_token_address(), - V2TokenEvent::TransferEvent(inner) => inner.get_object_address(), - _ => event_account_address.clone(), - }; - - if let Some(metadata) = token_v2_metadata.get(&token_data_id) { - let object_core = &metadata.object.object_core; - let token_activity_helper = match token_event { - V2TokenEvent::MintEvent(_) => TokenActivityHelperV2 { - from_address: Some(object_core.get_owner_address()), - to_address: None, - token_amount: BigDecimal::one(), - before_value: None, - after_value: None, - event_type: event_type.clone(), - }, - V2TokenEvent::Mint(_) => TokenActivityHelperV2 { - from_address: Some(object_core.get_owner_address()), - to_address: None, - token_amount: BigDecimal::one(), - before_value: None, - after_value: None, - event_type: "0x4::collection::MintEvent".to_string(), - }, - V2TokenEvent::TokenMutationEvent(inner) => TokenActivityHelperV2 { - from_address: Some(object_core.get_owner_address()), - to_address: None, - token_amount: BigDecimal::zero(), - before_value: Some(inner.old_value.clone()), - after_value: Some(inner.new_value.clone()), - event_type: event_type.clone(), - }, - V2TokenEvent::TokenMutation(inner) => TokenActivityHelperV2 { - from_address: Some(inner.token_address.clone()), - to_address: None, - token_amount: BigDecimal::zero(), - before_value: Some(inner.old_value.clone()), - after_value: Some(inner.new_value.clone()), - event_type: "0x4::collection::MutationEvent".to_string(), - }, - V2TokenEvent::BurnEvent(_) => TokenActivityHelperV2 { - from_address: Some(object_core.get_owner_address()), - to_address: None, - token_amount: BigDecimal::one(), - before_value: None, - after_value: None, - event_type: event_type.clone(), - }, - V2TokenEvent::Burn(_) => TokenActivityHelperV2 { - from_address: Some(object_core.get_owner_address()), - to_address: None, - token_amount: BigDecimal::one(), - before_value: None, - after_value: None, - event_type: "0x4::collection::BurnEvent".to_string(), - }, - V2TokenEvent::TransferEvent(inner) => TokenActivityHelperV2 { - from_address: Some(inner.get_from_address()), - to_address: Some(inner.get_to_address()), - token_amount: BigDecimal::one(), - before_value: None, - after_value: None, - event_type: event_type.clone(), - }, - }; - return Ok(Some(Self { - transaction_version: txn_version, - event_index, - event_account_address, - token_data_id, - property_version_v1: BigDecimal::zero(), - type_: token_activity_helper.event_type, - from_address: token_activity_helper.from_address, - to_address: token_activity_helper.to_address, - token_amount: token_activity_helper.token_amount, - before_value: token_activity_helper.before_value, - after_value: token_activity_helper.after_value, - entry_function_id_str: entry_function_id_str.clone(), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: None, - transaction_timestamp: txn_timestamp, - })); - } else { - // If the object metadata isn't found in the transaction, then the token was burnt. - - // the new burn event has owner address now! - let owner_address = if let V2TokenEvent::Burn(inner) = token_event { - inner.get_previous_owner_address() - } else { - // To handle a case with the old burn events, when a token is minted and burnt in the same transaction - None - }; - - return Ok(Some(Self { - transaction_version: txn_version, - event_index, - event_account_address, - token_data_id, - property_version_v1: BigDecimal::zero(), - type_: event_type, - from_address: owner_address.clone(), - to_address: None, - token_amount: BigDecimal::one(), - before_value: None, - after_value: None, - entry_function_id_str: entry_function_id_str.clone(), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: None, - transaction_timestamp: txn_timestamp, - })); - } - } - Ok(None) - } - - pub fn get_v1_from_parsed_event( - event: &Event, - txn_version: i64, - txn_timestamp: chrono::NaiveDateTime, - event_index: i64, - entry_function_id_str: &Option, - ) -> anyhow::Result> { - let event_type = event.type_str.clone(); - if let Some(token_event) = &TokenEvent::from_event(&event_type, &event.data, txn_version)? { - let event_account_address = - standardize_address(&event.key.as_ref().unwrap().account_address); - let token_activity_helper = match token_event { - TokenEvent::MintTokenEvent(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.id.clone(), - property_version: BigDecimal::zero(), - from_address: Some(event_account_address.clone()), - to_address: None, - token_amount: inner.amount.clone(), - }, - TokenEvent::Mint(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.id.clone(), - property_version: BigDecimal::zero(), - from_address: Some(inner.get_account()), - to_address: None, - token_amount: inner.amount.clone(), - }, - TokenEvent::BurnTokenEvent(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.id.token_data_id.clone(), - property_version: inner.id.property_version.clone(), - from_address: Some(event_account_address.clone()), - to_address: None, - token_amount: inner.amount.clone(), - }, - TokenEvent::Burn(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.id.token_data_id.clone(), - property_version: inner.id.property_version.clone(), - from_address: Some(inner.get_account()), - to_address: None, - token_amount: inner.amount.clone(), - }, - TokenEvent::MutateTokenPropertyMapEvent(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.new_id.token_data_id.clone(), - property_version: inner.new_id.property_version.clone(), - from_address: Some(event_account_address.clone()), - to_address: None, - token_amount: BigDecimal::zero(), - }, - TokenEvent::MutatePropertyMap(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.new_id.token_data_id.clone(), - property_version: inner.new_id.property_version.clone(), - from_address: Some(inner.get_account()), - to_address: None, - token_amount: BigDecimal::zero(), - }, - TokenEvent::WithdrawTokenEvent(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.id.token_data_id.clone(), - property_version: inner.id.property_version.clone(), - from_address: Some(event_account_address.clone()), - to_address: None, - token_amount: inner.amount.clone(), - }, - TokenEvent::TokenWithdraw(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.id.token_data_id.clone(), - property_version: inner.id.property_version.clone(), - from_address: Some(inner.get_account()), - to_address: None, - token_amount: inner.amount.clone(), - }, - TokenEvent::DepositTokenEvent(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.id.token_data_id.clone(), - property_version: inner.id.property_version.clone(), - from_address: None, - to_address: Some(standardize_address(&event_account_address)), - token_amount: inner.amount.clone(), - }, - TokenEvent::TokenDeposit(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.id.token_data_id.clone(), - property_version: inner.id.property_version.clone(), - from_address: None, - to_address: Some(inner.get_account()), - token_amount: inner.amount.clone(), - }, - TokenEvent::OfferTokenEvent(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.token_id.token_data_id.clone(), - property_version: inner.token_id.property_version.clone(), - from_address: Some(event_account_address.clone()), - to_address: Some(inner.get_to_address()), - token_amount: inner.amount.clone(), - }, - TokenEvent::CancelTokenOfferEvent(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.token_id.token_data_id.clone(), - property_version: inner.token_id.property_version.clone(), - from_address: Some(event_account_address.clone()), - to_address: Some(inner.get_to_address()), - token_amount: inner.amount.clone(), - }, - TokenEvent::ClaimTokenEvent(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.token_id.token_data_id.clone(), - property_version: inner.token_id.property_version.clone(), - from_address: Some(event_account_address.clone()), - to_address: Some(inner.get_to_address()), - token_amount: inner.amount.clone(), - }, - TokenEvent::Offer(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.token_id.token_data_id.clone(), - property_version: inner.token_id.property_version.clone(), - from_address: Some(inner.get_from_address()), - to_address: Some(inner.get_to_address()), - token_amount: inner.amount.clone(), - }, - TokenEvent::CancelOffer(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.token_id.token_data_id.clone(), - property_version: inner.token_id.property_version.clone(), - from_address: Some(inner.get_from_address()), - to_address: Some(inner.get_to_address()), - token_amount: inner.amount.clone(), - }, - TokenEvent::Claim(inner) => TokenActivityHelperV1 { - token_data_id_struct: inner.token_id.token_data_id.clone(), - property_version: inner.token_id.property_version.clone(), - from_address: Some(inner.get_from_address()), - to_address: Some(inner.get_to_address()), - token_amount: inner.amount.clone(), - }, - }; - let token_data_id_struct = token_activity_helper.token_data_id_struct; - return Ok(Some(Self { - transaction_version: txn_version, - event_index, - event_account_address, - token_data_id: token_data_id_struct.to_id(), - property_version_v1: token_activity_helper.property_version, - type_: event_type, - from_address: token_activity_helper.from_address, - to_address: token_activity_helper.to_address, - token_amount: token_activity_helper.token_amount, - before_value: None, - after_value: None, - entry_function_id_str: entry_function_id_str.clone(), - token_standard: TokenStandard::V1.to_string(), - is_fungible_v2: None, - transaction_timestamp: txn_timestamp, - })); +impl TokenActivityV2Convertible for TokenActivityV2 { + fn from_raw(raw_item: RawTokenActivityV2) -> Self { + Self { + transaction_version: raw_item.transaction_version, + event_index: raw_item.event_index, + event_account_address: raw_item.event_account_address, + token_data_id: raw_item.token_data_id, + property_version_v1: raw_item.property_version_v1, + type_: raw_item.type_, + from_address: raw_item.from_address, + to_address: raw_item.to_address, + token_amount: raw_item.token_amount, + before_value: raw_item.before_value, + after_value: raw_item.after_value, + entry_function_id_str: raw_item.entry_function_id_str, + token_standard: raw_item.token_standard, + is_fungible_v2: raw_item.is_fungible_v2, + transaction_timestamp: raw_item.transaction_timestamp, } - Ok(None) } } diff --git a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs index 9d544e713..0b16ecc51 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs @@ -7,19 +7,21 @@ use crate::{ create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric, ParquetProcessingResult, }, - db::postgres::models::{ - fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, - object_models::v2_object_utils::{ - ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, - }, - resources::{FromWriteResource, V2TokenResource}, - token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, - token_v2_models::{ - parquet_v2_token_datas::TokenDataV2, - parquet_v2_token_ownerships::TokenOwnershipV2, - v2_token_ownerships::NFTOwnershipV2, - v2_token_utils::{ - Burn, BurnEvent, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, + db::{ + parquet::models::token_v2_models::v2_token_ownerships::TokenOwnershipV2, + postgres::models::{ + fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, + }, + resources::{FromWriteResource, V2TokenResource}, + token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, + token_v2_models::{ + parquet_v2_token_datas::TokenDataV2, + v2_token_ownerships::NFTOwnershipV2, + v2_token_utils::{ + Burn, BurnEvent, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, + }, }, }, }, diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index b300387c7..d05a3e9c6 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -7,6 +7,7 @@ use crate::{ common::models::token_v2_models::{ raw_token_claims::{CurrentTokenPendingClaimConvertible, RawCurrentTokenPendingClaim}, raw_v1_token_royalty::{CurrentTokenRoyaltyV1Convertible, RawCurrentTokenRoyaltyV1}, + raw_v2_token_activities::{RawTokenActivityV2, TokenActivityV2Convertible}, raw_v2_token_metadata::{CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata}, }, postgres::models::{ @@ -617,7 +618,7 @@ impl ProcessorTrait for TokenV2Processor { current_deleted_token_datas_v2, current_token_ownerships_v2, current_deleted_token_ownerships_v2, - token_activities_v2, + raw_token_activities_v2, raw_current_token_v2_metadata, raw_current_token_royalties_v1, raw_current_token_claims, @@ -647,6 +648,11 @@ impl ProcessorTrait for TokenV2Processor { .map(CurrentTokenV2Metadata::from_raw) .collect(); + let postgres_token_activities_v2: Vec = raw_token_activities_v2 + .into_iter() + .map(TokenActivityV2::from_raw) + .collect(); + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -683,7 +689,7 @@ impl ProcessorTrait for TokenV2Processor { ¤t_token_ownerships_v2, ¤t_deleted_token_ownerships_v2, ), - &token_activities_v2, + &postgres_token_activities_v2, &postgres_current_token_v2_metadata, &postgres_current_token_royalties_v1, &postgres_current_token_claims, @@ -735,7 +741,7 @@ pub async fn parse_v2_token( Vec, Vec, Vec, // deleted token ownerships - Vec, + Vec, Vec, Vec, Vec, @@ -910,7 +916,7 @@ pub async fn parse_v2_token( } } // handling all the token v1 events - if let Some(event) = TokenActivityV2::get_v1_from_parsed_event( + if let Some(event) = RawTokenActivityV2::get_v1_from_parsed_event( event, txn_version, txn_timestamp, @@ -922,7 +928,7 @@ pub async fn parse_v2_token( token_activities_v2.push(event); } // handling all the token v2 events - if let Some(event) = TokenActivityV2::get_nft_v2_from_parsed_event( + if let Some(event) = RawTokenActivityV2::get_nft_v2_from_parsed_event( event, txn_version, txn_timestamp, diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index a9a3a79d0..80b4da957 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -29,7 +29,7 @@ use processor::{ }, token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, - v2_token_metadata::CurrentTokenV2Metadata, + v2_token_activities::TokenActivityV2, v2_token_metadata::CurrentTokenV2Metadata, }, transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, }, @@ -167,6 +167,7 @@ impl ProcessorConfig { CurrentTokenPendingClaim::TABLE_NAME.to_string(), CurrentTokenRoyaltyV1::TABLE_NAME.to_string(), CurrentTokenV2Metadata::TABLE_NAME.to_string(), + TokenActivityV2::TABLE_NAME.to_string(), ]), _ => HashSet::new(), // Default case for unsupported processors } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 41861f688..2ac2ae0c5 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -34,7 +34,7 @@ use processor::{ }, token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, - v2_token_metadata::CurrentTokenV2Metadata, + v2_token_activities::TokenActivityV2, v2_token_metadata::CurrentTokenV2Metadata, }, transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, }, @@ -103,6 +103,7 @@ pub enum ParquetTypeEnum { CurrentTokenPendingClaims, CurrentTokenRoyaltyV1, CurrentTokenV2Metadata, + TokenActivitiesV2, } /// Trait for handling various Parquet types. @@ -191,6 +192,7 @@ impl_parquet_trait!( CurrentTokenV2Metadata, ParquetTypeEnum::CurrentTokenV2Metadata ); +impl_parquet_trait!(TokenActivityV2, ParquetTypeEnum::TokenActivitiesV2); #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] @@ -214,6 +216,7 @@ pub enum ParquetTypeStructs { CurrentTokenPendingClaim(Vec), CurrentTokenRoyaltyV1(Vec), CurrentTokenV2Metadata(Vec), + TokenActivityV2(Vec), } impl ParquetTypeStructs { @@ -258,6 +261,7 @@ impl ParquetTypeStructs { ParquetTypeEnum::CurrentTokenV2Metadata => { ParquetTypeStructs::CurrentTokenV2Metadata(Vec::new()) }, + ParquetTypeEnum::TokenActivitiesV2 => ParquetTypeStructs::TokenActivityV2(Vec::new()), } } @@ -382,6 +386,12 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + ( + ParquetTypeStructs::TokenActivityV2(self_data), + ParquetTypeStructs::TokenActivityV2(other_data), + ) => { + handle_append!(self_data, other_data) + }, _ => Err(ProcessorError::ProcessError { message: "Mismatched buffer types in append operation".to_string(), }), diff --git a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs index d32a12baa..e21e522f6 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs @@ -32,7 +32,7 @@ use processor::{ bq_analytics::generic_parquet_processor::HasParquetSchema, db::parquet::models::token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, - v2_token_metadata::CurrentTokenV2Metadata, + v2_token_activities::TokenActivityV2, v2_token_metadata::CurrentTokenV2Metadata, }, }; use std::{collections::HashMap, sync::Arc}; @@ -137,6 +137,10 @@ impl ProcessorTrait for ParquetTokenV2Processor { ParquetTypeEnum::CurrentTokenV2Metadata, CurrentTokenV2Metadata::schema(), ), + ( + ParquetTypeEnum::TokenActivitiesV2, + TokenActivityV2::schema(), + ), ] .into_iter() .collect(); diff --git a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs index b33c37f4a..3e990252c 100644 --- a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs @@ -14,11 +14,12 @@ use processor::{ common::models::token_v2_models::{ raw_token_claims::CurrentTokenPendingClaimConvertible, raw_v1_token_royalty::CurrentTokenRoyaltyV1Convertible, + raw_v2_token_activities::TokenActivityV2Convertible, raw_v2_token_metadata::CurrentTokenV2MetadataConvertible, }, parquet::models::token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, - v2_token_metadata::CurrentTokenV2Metadata, + v2_token_activities::TokenActivityV2, v2_token_metadata::CurrentTokenV2Metadata, }, postgres::models::token_models::tokens::TableMetadataForToken, }, @@ -90,7 +91,7 @@ impl Processable for ParquetTokenV2Extractor { _current_deleted_token_datas_v2, _current_token_ownerships_v2, _current_deleted_token_ownerships_v2, - _token_activities_v2, + raw_token_activities_v2, raw_current_token_v2_metadata, raw_current_token_royalties_v1, raw_current_token_claims, @@ -120,6 +121,11 @@ impl Processable for ParquetTokenV2Extractor { .map(CurrentTokenV2Metadata::from_raw) .collect(); + let parquet_token_activities_v2: Vec = raw_token_activities_v2 + .into_iter() + .map(TokenActivityV2::from_raw) + .collect(); + // Print the size of each extracted data type debug!("Processed data sizes:"); debug!( @@ -134,6 +140,7 @@ impl Processable for ParquetTokenV2Extractor { " - CurrentTokenV2Metadata: {}", parquet_current_token_v2_metadata.len() ); + debug!(" - TokenActivityV2: {}", parquet_token_activities_v2.len()); let mut map: HashMap = HashMap::new(); @@ -154,6 +161,11 @@ impl Processable for ParquetTokenV2Extractor { ParquetTypeEnum::CurrentTokenV2Metadata, ParquetTypeStructs::CurrentTokenV2Metadata(parquet_current_token_v2_metadata), ), + ( + TableFlags::TOKEN_ACTIVITIES_V2, + ParquetTypeEnum::TokenActivitiesV2, + ParquetTypeStructs::TokenActivityV2(parquet_token_activities_v2), + ), ]; // Populate the map based on opt-in tables diff --git a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs index 398196181..abbe1fe78 100644 --- a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs @@ -11,6 +11,7 @@ use processor::{ common::models::token_v2_models::{ raw_token_claims::CurrentTokenPendingClaimConvertible, raw_v1_token_royalty::CurrentTokenRoyaltyV1Convertible, + raw_v2_token_activities::TokenActivityV2Convertible, raw_v2_token_metadata::CurrentTokenV2MetadataConvertible, }, postgres::models::{ @@ -112,7 +113,7 @@ impl Processable for TokenV2Extractor { current_deleted_token_datas_v2, current_token_ownerships_v2, current_deleted_token_ownerships_v2, - token_activities_v2, + raw_token_activities_v2, raw_current_token_v2_metadata, raw_current_token_royalties_v1, raw_current_token_claims, @@ -142,6 +143,11 @@ impl Processable for TokenV2Extractor { .map(CurrentTokenV2Metadata::from_raw) .collect(); + let postgres_token_activities_v2: Vec = raw_token_activities_v2 + .into_iter() + .map(TokenActivityV2::from_raw) + .collect(); + Ok(Some(TransactionContext { data: ( collections_v2, @@ -152,7 +158,7 @@ impl Processable for TokenV2Extractor { current_deleted_token_datas_v2, current_token_ownerships_v2, current_deleted_token_ownerships_v2, - token_activities_v2, + postgres_token_activities_v2, postgres_current_token_v2_metadata, postgres_current_token_royalties_v1, postgres_current_token_claims,