diff --git a/rust/processor/src/db/common/models/object_models/v2_objects.rs b/rust/processor/src/db/common/models/object_models/v2_objects.rs index a80a504c..35877dff 100644 --- a/rust/processor/src/db/common/models/object_models/v2_objects.rs +++ b/rust/processor/src/db/common/models/object_models/v2_objects.rs @@ -19,6 +19,8 @@ use diesel_async::RunQueryDsl; use field_count::FieldCount; use serde::{Deserialize, Serialize}; +const DELETED_RESOURCE_OWNER_ADDRESS: &str = "Unknown"; + #[derive(Clone, Debug, Deserialize, FieldCount, Serialize)] pub struct RawObject { pub transaction_version: i64, @@ -30,6 +32,7 @@ pub struct RawObject { pub allow_ungated_transfer: bool, pub is_deleted: bool, pub untransferrable: bool, + pub block_timestamp: chrono::NaiveDateTime, } pub trait ObjectConvertible { @@ -46,6 +49,7 @@ pub struct RawCurrentObject { pub last_transaction_version: i64, pub is_deleted: bool, pub untransferrable: bool, + pub block_timestamp: chrono::NaiveDateTime, } pub trait CurrentObjectConvertible { @@ -73,14 +77,10 @@ impl RawObject { txn_version: i64, write_set_change_index: i64, object_metadata_mapping: &ObjectAggregatedDataMapping, + block_timestamp: chrono::NaiveDateTime, ) -> anyhow::Result> { let address = standardize_address(&write_resource.address.to_string()); - println!("address: {:?}", address); if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) { - println!( - "object_aggregated_metadata: {:?}", - object_aggregated_metadata - ); // do something let object_with_metadata = object_aggregated_metadata.object.clone(); let object_core = object_with_metadata.object_core; @@ -101,6 +101,7 @@ impl RawObject { allow_ungated_transfer: object_core.allow_ungated_transfer, is_deleted: false, untransferrable, + block_timestamp, }, RawCurrentObject { object_address: address, @@ -111,6 +112,7 @@ impl RawObject { last_transaction_version: txn_version, is_deleted: false, untransferrable, + block_timestamp, }, ))) } else { @@ -126,7 +128,7 @@ impl RawObject { txn_version: i64, write_set_change_index: i64, object_mapping: &AHashMap, - conn: &mut DbPoolConnection<'_>, + conn: &mut Option>, query_retries: u32, query_retry_delay_ms: u64, ) -> anyhow::Result> { @@ -137,52 +139,91 @@ impl RawObject { txn_version, 0, // Placeholder, this isn't used anyway ); - let previous_object = if let Some(object) = object_mapping.get(&resource.address) { - object.clone() + // Add a logid here to handle None conn + if conn.is_none() { + // This is a hack to prevent the program for parquet + Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + object_address: resource.address.clone(), + owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(), + state_key_hash: resource.state_key_hash.clone(), + guid_creation_num: BigDecimal::default(), + allow_ungated_transfer: false, + is_deleted: true, + untransferrable: false, + block_timestamp: chrono::NaiveDateTime::default(), + }, + RawCurrentObject { + object_address: resource.address.clone(), + owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(), + state_key_hash: resource.state_key_hash.clone(), + last_guid_creation_num: BigDecimal::default(), + allow_ungated_transfer: false, + last_transaction_version: txn_version, + is_deleted: true, + untransferrable: false, + block_timestamp: chrono::NaiveDateTime::default(), + }, + ))) } else { - match Self::get_current_object( - conn, - &resource.address, - query_retries, - query_retry_delay_ms, - ) - .await - { - Ok(object) => object, - Err(_) => { - tracing::error!( + let previous_object = if let Some(object) = object_mapping.get(&resource.address) { + object.clone() + } else if let Some(ref mut conn) = conn { + match Self::get_current_object( + conn, + &resource.address, + query_retries, + query_retry_delay_ms, + ) + .await + { + Ok(object) => object, + Err(_) => { + tracing::error!( transaction_version = txn_version, lookup_key = &resource.address, "Missing current_object for object_address: {}. You probably should backfill db.", resource.address, ); + return Ok(None); + }, + } + } else { + tracing::error!( + transaction_version = txn_version, + lookup_key = &resource.address, + "Connection to DB is missing. You may need to investigate", + ); return Ok(None); + }; + Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + object_address: resource.address.clone(), + owner_address: previous_object.owner_address.clone(), + state_key_hash: resource.state_key_hash.clone(), + guid_creation_num: previous_object.last_guid_creation_num.clone(), + allow_ungated_transfer: previous_object.allow_ungated_transfer, + is_deleted: true, + untransferrable: previous_object.untransferrable, + block_timestamp: previous_object.block_timestamp, }, - } - }; - Ok(Some(( - Self { - transaction_version: txn_version, - write_set_change_index, - object_address: resource.address.clone(), - owner_address: previous_object.owner_address.clone(), - state_key_hash: resource.state_key_hash.clone(), - guid_creation_num: previous_object.last_guid_creation_num.clone(), - allow_ungated_transfer: previous_object.allow_ungated_transfer, - is_deleted: true, - untransferrable: previous_object.untransferrable, - }, - RawCurrentObject { - object_address: resource.address, - owner_address: previous_object.owner_address.clone(), - state_key_hash: resource.state_key_hash, - last_guid_creation_num: previous_object.last_guid_creation_num.clone(), - allow_ungated_transfer: previous_object.allow_ungated_transfer, - last_transaction_version: txn_version, - is_deleted: true, - untransferrable: previous_object.untransferrable, - }, - ))) + RawCurrentObject { + object_address: resource.address, + owner_address: previous_object.owner_address.clone(), + state_key_hash: resource.state_key_hash, + last_guid_creation_num: previous_object.last_guid_creation_num.clone(), + allow_ungated_transfer: previous_object.allow_ungated_transfer, + last_transaction_version: txn_version, + is_deleted: true, + untransferrable: previous_object.untransferrable, + block_timestamp: previous_object.block_timestamp, + }, + ))) + } } else { Ok(None) } @@ -210,6 +251,7 @@ impl RawObject { last_transaction_version: res.last_transaction_version, is_deleted: res.is_deleted, untransferrable: res.untransferrable, + block_timestamp: chrono::NaiveDateTime::default(), // this won't be used }); }, Err(_) => { diff --git a/rust/processor/src/db/parquet/models/mod.rs b/rust/processor/src/db/parquet/models/mod.rs index ca073da7..c14a199e 100644 --- a/rust/processor/src/db/parquet/models/mod.rs +++ b/rust/processor/src/db/parquet/models/mod.rs @@ -6,3 +6,4 @@ pub mod fungible_asset_models; pub mod token_v2_models; pub mod transaction_metadata_model; pub mod user_transaction_models; +pub mod object_models; \ No newline at end of file diff --git a/rust/processor/src/db/parquet/models/object_models/v2_objects.rs b/rust/processor/src/db/parquet/models/object_models/v2_objects.rs index 6587eac1..3729208b 100644 --- a/rust/processor/src/db/parquet/models/object_models/v2_objects.rs +++ b/rust/processor/src/db/parquet/models/object_models/v2_objects.rs @@ -5,49 +5,115 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping}; -use crate::{ - db::postgres::models::default_models::move_resources::MoveResource, - schema::{current_objects, objects}, - utils::{database::DbPoolConnection, util::standardize_address}, -}; -use ahash::AHashMap; -use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; -use bigdecimal::BigDecimal; -use diesel::prelude::*; -use diesel_async::RunQueryDsl; +use bigdecimal::{ToPrimitive}; use field_count::FieldCount; use serde::{Deserialize, Serialize}; +use allocative_derive::Allocative; +use parquet_derive::ParquetRecordWriter; +use crate::bq_analytics::generic_parquet_processor::NamedTable; +use crate::bq_analytics::generic_parquet_processor::HasVersion; +use crate::bq_analytics::generic_parquet_processor::GetTimeStamp; +use crate::db::common::models::object_models::v2_objects::ObjectConvertible; +use crate::db::common::models::object_models::v2_objects::CurrentObjectConvertible; +use crate::db::common::models::object_models::v2_objects::RawObject; +use crate::db::common::models::object_models::v2_objects::RawCurrentObject; -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] -pub struct RawObject { - pub transaction_version: i64, +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct Object { + pub txn_version: i64, pub write_set_change_index: i64, pub object_address: String, pub owner_address: String, pub state_key_hash: String, - pub guid_creation_num: BigDecimal, + pub guid_creation_num: u64, // BigDecimal, pub allow_ungated_transfer: bool, pub is_deleted: bool, pub untransferrable: bool, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, } -pub trait ObjectConvertible { - fn from_raw(raw_item: RawObject) -> Self; +impl NamedTable for Object { + const TABLE_NAME: &'static str = "objects"; } -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] -pub struct RawCurrentObject { +impl HasVersion for Object { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for Object { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl ObjectConvertible for Object { + fn from_raw(raw_item: RawObject) -> Self { + Self { + txn_version: raw_item.transaction_version, + write_set_change_index: raw_item.write_set_change_index, + object_address: raw_item.object_address, + owner_address: raw_item.owner_address, + state_key_hash: raw_item.state_key_hash, + guid_creation_num: raw_item.guid_creation_num.to_u64().unwrap(), + allow_ungated_transfer: raw_item.allow_ungated_transfer, + is_deleted: raw_item.is_deleted, + untransferrable: raw_item.untransferrable, + block_timestamp: raw_item.block_timestamp, + } + } +} + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct CurrentObject { pub object_address: String, pub owner_address: String, pub state_key_hash: String, pub allow_ungated_transfer: bool, - pub last_guid_creation_num: BigDecimal, + pub last_guid_creation_num: u64, // BigDecimal, pub last_transaction_version: i64, pub is_deleted: bool, pub untransferrable: bool, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + + +impl NamedTable for CurrentObject { + const TABLE_NAME: &'static str = "objects"; +} + +impl HasVersion for CurrentObject { + fn version(&self) -> i64 { + self.last_transaction_version + } } -pub trait CurrentObjectConvertible { - fn from_raw(raw_item: RawCurrentObject) -> Self; +impl GetTimeStamp for CurrentObject { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + + +impl CurrentObjectConvertible for CurrentObject { + fn from_raw(raw_item: RawCurrentObject) -> Self { + Self { + object_address: raw_item.object_address, + owner_address: raw_item.owner_address, + state_key_hash: raw_item.state_key_hash, + allow_ungated_transfer: raw_item.allow_ungated_transfer, + last_guid_creation_num: raw_item.last_guid_creation_num.to_u64().unwrap(), + last_transaction_version: raw_item.last_transaction_version, + is_deleted: raw_item.is_deleted, + untransferrable: raw_item.untransferrable, + block_timestamp: raw_item.block_timestamp, + } + } } diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs index f13b7ce7..10d9d13d 100644 --- a/rust/processor/src/processors/objects_processor.rs +++ b/rust/processor/src/processors/objects_processor.rs @@ -39,6 +39,8 @@ use diesel::{ use serde::{Deserialize, Serialize}; use std::fmt::Debug; use tracing::error; +use crate::utils::database::DbPoolConnection; +use crate::utils::util::parse_timestamp; #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] @@ -178,128 +180,21 @@ impl ProcessorTrait for ObjectsProcessor { let processing_start: std::time::Instant = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let mut conn = self.get_conn().await; + let conn = self.get_conn().await; let query_retries = self.config.query_retries; let query_retry_delay_ms = self.config.query_retry_delay_ms; - // Moving object handling here because we need a single object - // map through transactions for lookups - let mut all_objects = vec![]; - let mut all_current_objects = AHashMap::new(); - let mut object_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new(); - - for txn in &transactions { - let txn_version = txn.version as i64; - let changes = &txn - .info - .as_ref() - .unwrap_or_else(|| { - panic!( - "Transaction info doesn't exist! Transaction {}", - txn_version - ) - }) - .changes; - // First pass to get all the object cores - for wsc in changes.iter() { - if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { - let address = standardize_address(&wr.address.to_string()); - if let Some(object_with_metadata) = - ObjectWithMetadata::from_write_resource(wr).unwrap() - { - // Object core is the first struct that we need to get - object_metadata_helper.insert(address.clone(), ObjectAggregatedData { - object: object_with_metadata, - token: None, - fungible_asset_store: None, - // The following structs are unused in this processor - fungible_asset_metadata: None, - aptos_collection: None, - fixed_supply: None, - unlimited_supply: None, - concurrent_supply: None, - property_map: None, - transfer_events: vec![], - untransferable: None, - fungible_asset_supply: None, - concurrent_fungible_asset_supply: None, - concurrent_fungible_asset_balance: None, - token_identifier: None, - }); - } - } - } - - // Second pass to get object metadata - for wsc in changes.iter() { - if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { - let address = standardize_address(&write_resource.address.to_string()); - if let Some(aggregated_data) = object_metadata_helper.get_mut(&address) { - if let Some(untransferable) = - Untransferable::from_write_resource(write_resource).unwrap() - { - aggregated_data.untransferable = Some(untransferable); - } - } - } - } - - // Second pass to construct the object data - for (index, wsc) in changes.iter().enumerate() { - let index: i64 = index as i64; - match wsc.change.as_ref().unwrap() { - Change::WriteResource(inner) => { - println!("index: {:?}", index); - if let Some((object, current_object)) = &RawObject::from_write_resource( - inner, - txn_version, - index, - &object_metadata_helper, - ) - .unwrap() - { - all_objects.push(object.clone()); - all_current_objects - .insert(object.object_address.clone(), current_object.clone()); - } - }, - Change::DeleteResource(inner) => { - // Passing all_current_objects into the function so that we can get the owner of the deleted - // resource if it was handled in the same batch - if let Some((object, current_object)) = RawObject::from_delete_resource( - inner, - txn_version, - index, - &all_current_objects, - &mut conn, - query_retries, - query_retry_delay_ms, - ) - .await - .unwrap() - { - all_objects.push(object.clone()); - all_current_objects - .insert(object.object_address.clone(), current_object.clone()); - } - }, - _ => {}, - }; - } - } - - // Sort by PK - let mut all_current_objects = all_current_objects - .into_values() - .collect::>(); - all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); + let ( + mut raw_all_objects, + raw_all_current_objects, + ) = process_objects(transactions, &mut Some(conn), query_retries, query_retry_delay_ms).await; if self.deprecated_tables.contains(TableFlags::OBJECTS) { - all_objects.clear(); + raw_all_objects.clear(); } - let postgres_objects: Vec = all_objects.into_iter().map(Object::from_raw).collect(); - let postgres_current_objects: Vec = all_current_objects + let postgres_objects: Vec = raw_all_objects.into_iter().map(Object::from_raw).collect(); + let postgres_current_objects: Vec = raw_all_current_objects .into_iter() .map(CurrentObject::from_raw) .collect(); @@ -345,3 +240,131 @@ impl ProcessorTrait for ObjectsProcessor { &self.connection_pool } } + +pub async fn process_objects( + transactions: Vec, + conn: &mut Option>, + query_retries: u32, + query_retry_delay_ms: u64, +) -> ( + Vec, + Vec, +) { + + // Moving object handling here because we need a single object + // map through transactions for lookups + let mut all_objects = vec![]; + let mut all_current_objects = AHashMap::new(); + let mut object_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new(); + + for txn in &transactions { + let txn_version = txn.version as i64; + let changes = &txn + .info + .as_ref() + .unwrap_or_else(|| { + panic!( + "Transaction info doesn't exist! Transaction {}", + txn_version + ) + }) + .changes; + + let txn_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); + + // First pass to get all the object cores + for wsc in changes.iter() { + if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { + let address = standardize_address(&wr.address.to_string()); + if let Some(object_with_metadata) = + ObjectWithMetadata::from_write_resource(wr).unwrap() + { + // Object core is the first struct that we need to get + object_metadata_helper.insert(address.clone(), ObjectAggregatedData { + object: object_with_metadata, + token: None, + fungible_asset_store: None, + // The following structs are unused in this processor + fungible_asset_metadata: None, + aptos_collection: None, + fixed_supply: None, + unlimited_supply: None, + concurrent_supply: None, + property_map: None, + transfer_events: vec![], + untransferable: None, + fungible_asset_supply: None, + concurrent_fungible_asset_supply: None, + concurrent_fungible_asset_balance: None, + token_identifier: None, + }); + } + } + } + + // Second pass to get object metadata + for wsc in changes.iter() { + if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { + let address = standardize_address(&write_resource.address.to_string()); + if let Some(aggregated_data) = object_metadata_helper.get_mut(&address) { + if let Some(untransferable) = + Untransferable::from_write_resource(write_resource).unwrap() + { + aggregated_data.untransferable = Some(untransferable); + } + } + } + } + + // Second pass to construct the object data + for (index, wsc) in changes.iter().enumerate() { + let index: i64 = index as i64; + match wsc.change.as_ref().unwrap() { + Change::WriteResource(inner) => { + if let Some((object, current_object)) = &RawObject::from_write_resource( + inner, + txn_version, + index, + &object_metadata_helper, + txn_timestamp + ) + .unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + Change::DeleteResource(inner) => { + // Passing all_current_objects into the function so that we can get the owner of the deleted + // resource if it was handled in the same batch + if let Some((object, current_object)) = RawObject::from_delete_resource( + inner, + txn_version, + index, + &all_current_objects, + conn, + query_retries, + query_retry_delay_ms, + ) + .await + .unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + _ => {}, + }; + } + } + + // Sort by PK + let mut all_current_objects = all_current_objects + .into_values() + .collect::>(); + all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); + + (all_objects, all_current_objects) +} \ No newline at end of file diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 6b4e5943..bdf8ebfa 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -8,6 +8,7 @@ use crate::{ parquet_default_processor::ParquetDefaultProcessor, parquet_events_processor::ParquetEventsProcessor, parquet_fungible_asset_processor::ParquetFungibleAssetProcessor, + parquet_objects_processor::ParquetObjectsProcessor, parquet_token_v2_processor::ParquetTokenV2Processor, parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor, parquet_user_transaction_processor::ParquetUserTransactionsProcessor, @@ -117,6 +118,10 @@ impl RunnableConfig for IndexerProcessorConfig { let parquet_token_v2_processor = ParquetTokenV2Processor::new(self.clone()).await?; parquet_token_v2_processor.run_processor().await }, + ProcessorConfig::ParquetObjectsProcessor(_) => { + let parquet_objects_processor = ParquetObjectsProcessor::new(self.clone()).await?; + parquet_objects_processor.run_processor().await + }, } } diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index d0812a1f..3cb54540 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -27,6 +27,7 @@ use processor::{ }, parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, + object_models::v2_objects::{CurrentObject, Object}, token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, @@ -94,6 +95,7 @@ pub enum ProcessorConfig { ParquetTransactionMetadataProcessor(ParquetDefaultProcessorConfig), ParquetAccountTransactionsProcessor(ParquetDefaultProcessorConfig), ParquetTokenV2Processor(ParquetDefaultProcessorConfig), + ParquetObjectsProcessor(ParquetDefaultProcessorConfig), } impl ProcessorConfig { @@ -115,6 +117,7 @@ impl ProcessorConfig { | ProcessorConfig::ParquetTransactionMetadataProcessor(config) | ProcessorConfig::ParquetAccountTransactionsProcessor(config) | ProcessorConfig::ParquetTokenV2Processor(config) + | ProcessorConfig::ParquetObjectsProcessor(config) | ProcessorConfig::ParquetFungibleAssetProcessor(config) => { // Get the processor name as a prefix let processor_name = self.name(); @@ -183,6 +186,10 @@ impl ProcessorConfig { TokenOwnershipV2::TABLE_NAME.to_string(), CurrentTokenOwnershipV2::TABLE_NAME.to_string(), ]), + ProcessorName::ParquetObjectsProcessor => HashSet::from([ + Object::TABLE_NAME.to_string(), + CurrentObject::TABLE_NAME.to_string(), + ]), _ => HashSet::new(), // Default case for unsupported processors } } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index a7e75b1d..d553a54e 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -33,6 +33,7 @@ use processor::{ }, parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, + object_models::v2_objects::{CurrentObject, Object}, token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, @@ -60,6 +61,7 @@ pub mod parquet_ans_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; +pub mod parquet_objects_processor; pub mod parquet_token_v2_processor; pub mod parquet_transaction_metadata_processor; pub mod parquet_user_transaction_processor; @@ -123,6 +125,9 @@ pub enum ParquetTypeEnum { CurrentTokenDatasV2, TokenOwnershipsV2, CurrentTokenOwnershipsV2, + // Objects + Objects, + CurrentObjects, } /// Trait for handling various Parquet types. @@ -221,7 +226,8 @@ impl_parquet_trait!( CurrentTokenOwnershipV2, ParquetTypeEnum::CurrentTokenOwnershipsV2 ); - +impl_parquet_trait!(Object, ParquetTypeEnum::Objects); +impl_parquet_trait!(CurrentObject, ParquetTypeEnum::CurrentObjects); #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] pub enum ParquetTypeStructs { @@ -251,6 +257,8 @@ pub enum ParquetTypeStructs { CurrentTokenDataV2(Vec), TokenOwnershipV2(Vec), CurrentTokenOwnershipV2(Vec), + Object(Vec), + CurrentObject(Vec), } impl ParquetTypeStructs { @@ -306,6 +314,8 @@ impl ParquetTypeStructs { ParquetTypeEnum::CurrentTokenOwnershipsV2 => { ParquetTypeStructs::CurrentTokenOwnershipV2(Vec::new()) }, + ParquetTypeEnum::Objects => ParquetTypeStructs::Object(Vec::new()), + ParquetTypeEnum::CurrentObjects => ParquetTypeStructs::CurrentObject(Vec::new()), } } @@ -472,6 +482,15 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + (ParquetTypeStructs::Object(self_data), ParquetTypeStructs::Object(other_data)) => { + handle_append!(self_data, other_data) + }, + ( + ParquetTypeStructs::CurrentObject(self_data), + ParquetTypeStructs::CurrentObject(other_data), + ) => { + handle_append!(self_data, other_data) + }, _ => Err(ProcessorError::ProcessError { message: "Mismatched buffer types in append operation".to_string(), }), diff --git a/rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs new file mode 100644 index 00000000..f91c2392 --- /dev/null +++ b/rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs @@ -0,0 +1,176 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + set_backfill_table_flag, ParquetTypeEnum, + }, + steps::{ + common::{ + parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_processor_status_saver, + }, + parquet_objects_processor::parquet_objects_extractor::ParquetObjectsExtractor, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, + }, +}; +use anyhow::Context; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::parquet::models::object_models::v2_objects::{CurrentObject, Object}, +}; +use std::{collections::HashMap, sync::Arc}; +use tracing::{debug, info}; + +pub struct ParquetObjectsProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, // for processor status +} + +impl ParquetObjectsProcessor { + pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) + } +} +#[async_trait::async_trait] +impl ProcessorTrait for ParquetObjectsProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> anyhow::Result<()> { + // Run Migrations + let parquet_db_config = match self.config.db_config { + DbConfig::ParquetConfig(ref parquet_config) => { + run_migrations( + parquet_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + parquet_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid db config for ParquetObjectsProcessor {:?}", + self.config.db_config + )); + }, + }; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let parquet_processor_config = match self.config.processor_config.clone() { + ProcessorConfig::ParquetObjectsProcessor(parquet_processor_config) => { + parquet_processor_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor configuration for ParquetObjectsProcessor {:?}", + self.config.processor_config + )); + }, + }; + + let processor_status_table_names = self + .config + .processor_config + .get_processor_status_table_names() + .context("Failed to get table names for the processor status table")?; + + let starting_version = get_min_last_success_version_parquet( + &self.config, + self.db_pool.clone(), + processor_status_table_names, + ) + .await?; + + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table); + let parquet_objects_extractor = ParquetObjectsExtractor { + opt_in_tables: backfill_table, + }; + + let gcs_client = + initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await; + + let parquet_type_to_schemas: HashMap> = [ + (ParquetTypeEnum::Objects, Object::schema()), + (ParquetTypeEnum::CurrentObjects, CurrentObject::schema()), + ] + .into_iter() + .collect(); + + let default_size_buffer_step = initialize_parquet_buffer_step( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_processor_config.upload_interval, + parquet_processor_config.max_buffer_size, + parquet_db_config.bucket_name.clone(), + parquet_db_config.bucket_root.clone(), + self.name().to_string(), + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + + let channel_size = parquet_processor_config.channel_size; + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(parquet_objects_extractor.into_runnable_step(), channel_size) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs index 1dc3575b..074f3295 100644 --- a/rust/sdk-processor/src/steps/common/gcs_uploader.rs +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -127,7 +127,7 @@ impl GCSUploader { for<'a> &'a [ParquetType]: RecordWriter, { if data.is_empty() { - debug!("Buffer is empty, skipping upload."); + println!("Buffer is empty, skipping upload."); return Ok(()); } diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 5f0f944e..4e283b13 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -14,6 +14,7 @@ pub mod parquet_account_transactions_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; +pub mod parquet_objects_processor; pub mod parquet_token_v2_processor; pub mod parquet_transaction_metadata_processor; pub mod parquet_user_transaction_processor; diff --git a/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs b/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs index 9187a3e4..360fecaa 100644 --- a/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs +++ b/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs @@ -1,27 +1,17 @@ use crate::utils::database::ArcDbPool; -use ahash::AHashMap; use aptos_indexer_processor_sdk::{ - aptos_protos::transaction::v1::{write_set_change::Change, Transaction}, + aptos_protos::transaction::v1::Transaction, traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, types::transaction_context::TransactionContext, - utils::{convert::standardize_address, errors::ProcessorError}, + utils::errors::ProcessorError, }; use async_trait::async_trait; use processor::{ db::{ - common::models::object_models::{ - v2_object_utils::{ - ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, - }, - v2_objects::{ - CurrentObjectConvertible, ObjectConvertible, RawCurrentObject, RawObject, - }, - }, - postgres::models::{ - object_models::v2_objects::{CurrentObject, Object}, - resources::FromWriteResource, - }, + common::models::object_models::v2_objects::{CurrentObjectConvertible, ObjectConvertible}, + postgres::models::object_models::v2_objects::{CurrentObject, Object}, }, + processors::objects_processor::process_objects, utils::table_flags::TableFlags, }; @@ -62,7 +52,7 @@ impl Processable for ObjectsExtractor { &mut self, transactions: TransactionContext>, ) -> Result, Vec)>>, ProcessorError> { - let mut conn = self + let conn = self .conn_pool .get() .await @@ -73,112 +63,22 @@ impl Processable for ObjectsExtractor { let query_retries = self.query_retries; let query_retry_delay_ms = self.query_retry_delay_ms; - // Moving object handling here because we need a single object - // map through transactions for lookups - let mut all_objects = vec![]; - let mut all_current_objects = AHashMap::new(); - let mut object_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new(); - - for txn in &transactions.data { - let txn_version = txn.version as i64; - let changes = &txn - .info - .as_ref() - .unwrap_or_else(|| { - panic!( - "Transaction info doesn't exist! Transaction {}", - txn_version - ) - }) - .changes; - - // First pass to get all the object cores - for wsc in changes.iter() { - if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { - let address = standardize_address(&wr.address.to_string()); - if let Some(object_with_metadata) = - ObjectWithMetadata::from_write_resource(wr).unwrap() - { - // Object core is the first struct that we need to get - object_metadata_helper.insert(address.clone(), ObjectAggregatedData { - object: object_with_metadata, - token: None, - fungible_asset_store: None, - // The following structs are unused in this processor - fungible_asset_metadata: None, - aptos_collection: None, - fixed_supply: None, - unlimited_supply: None, - concurrent_supply: None, - property_map: None, - transfer_events: vec![], - untransferable: None, - fungible_asset_supply: None, - concurrent_fungible_asset_supply: None, - concurrent_fungible_asset_balance: None, - token_identifier: None, - }); - } - } - } - - // Second pass to construct the object data - for (index, wsc) in changes.iter().enumerate() { - let index: i64 = index as i64; - match wsc.change.as_ref().unwrap() { - Change::WriteResource(inner) => { - if let Some((object, current_object)) = &RawObject::from_write_resource( - inner, - txn_version, - index, - &object_metadata_helper, - ) - .unwrap() - { - all_objects.push(object.clone()); - all_current_objects - .insert(object.object_address.clone(), current_object.clone()); - } - }, - Change::DeleteResource(inner) => { - // Passing all_current_objects into the function so that we can get the owner of the deleted - // resource if it was handled in the same batch - if let Some((object, current_object)) = RawObject::from_delete_resource( - inner, - txn_version, - index, - &all_current_objects, - &mut conn, - query_retries, - query_retry_delay_ms, - ) - .await - .unwrap() - { - all_objects.push(object.clone()); - all_current_objects - .insert(object.object_address.clone(), current_object.clone()); - } - }, - _ => {}, - }; - } - } - - // Sort by PK - let mut all_current_objects = all_current_objects - .into_values() - .collect::>(); - all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); + let (mut raw_all_objects, raw_all_current_objects) = process_objects( + transactions.data, + &mut Some(conn), + query_retries, + query_retry_delay_ms, + ) + .await; if self.deprecated_tables.contains(TableFlags::OBJECTS) { - all_objects.clear(); + raw_all_objects.clear(); } let postgres_all_objects: Vec = - all_objects.into_iter().map(Object::from_raw).collect(); + raw_all_objects.into_iter().map(Object::from_raw).collect(); - let postgres_all_current_objects: Vec = all_current_objects + let postgres_all_current_objects: Vec = raw_all_current_objects .into_iter() .map(CurrentObject::from_raw) .collect(); diff --git a/rust/sdk-processor/src/steps/parquet_objects_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_objects_processor/mod.rs new file mode 100644 index 00000000..7dfc9a12 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_objects_processor/mod.rs @@ -0,0 +1 @@ +pub mod parquet_objects_extractor; diff --git a/rust/sdk-processor/src/steps/parquet_objects_processor/parquet_objects_extractor.rs b/rust/sdk-processor/src/steps/parquet_objects_processor/parquet_objects_extractor.rs new file mode 100644 index 00000000..11317b3b --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_objects_processor/parquet_objects_extractor.rs @@ -0,0 +1,83 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + utils::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, +}; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::{ + common::models::object_models::v2_objects::{CurrentObjectConvertible, ObjectConvertible}, + parquet::models::object_models::v2_objects::{CurrentObject, Object}, + }, + processors::objects_processor::process_objects, + utils::table_flags::TableFlags, +}; +use std::collections::HashMap; + +/// Extracts parquet data from transactions, allowing optional selection of specific tables. +pub struct ParquetObjectsExtractor +where + Self: Processable + Send + Sized + 'static, +{ + pub opt_in_tables: TableFlags, +} + +type ParquetTypeMap = HashMap; + +#[async_trait] +impl Processable for ParquetObjectsExtractor { + type Input = Vec; + type Output = ParquetTypeMap; + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> anyhow::Result>, ProcessorError> { + let (raw_all_objects, raw_all_current_objects) = + process_objects(transactions.data, &mut None, 0, 0).await; + let parquet_objects: Vec = + raw_all_objects.into_iter().map(Object::from_raw).collect(); + + let parquet_current_objects: Vec = raw_all_current_objects + .into_iter() + .map(CurrentObject::from_raw) + .collect(); + + let mut map: HashMap = HashMap::new(); + + let data_types = [ + ( + TableFlags::OBJECTS, + ParquetTypeEnum::Objects, + ParquetTypeStructs::Object(parquet_objects), + ), + ( + TableFlags::CURRENT_OBJECTS, + ParquetTypeEnum::CurrentObjects, + ParquetTypeStructs::CurrentObject(parquet_current_objects), + ), + ]; + + // Populate the map based on opt-in tables + add_to_map_if_opted_in_for_backfill(self.opt_in_tables, &mut map, data_types.to_vec()); + + Ok(Some(TransactionContext { + data: map, + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for ParquetObjectsExtractor {} + +impl NamedStep for ParquetObjectsExtractor { + fn name(&self) -> String { + "ParquetObjectsExtractor".to_string() + } +}