From 0cad20631225ee00313d40feaa916ef3a3f5a08c Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Wed, 18 Dec 2024 14:51:53 -0800 Subject: [PATCH] [parquet-sdk]migrate objects [pt2] (#641) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description - migrate objects - for parquet, we want to avoid lookups for deleted resource. so we replaced with default values, which will be handled during the load as a part of qc pipeline. ``` Self { transaction_version: txn_version, write_set_change_index, object_address: resource.address.clone(), owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(), <------- state_key_hash: resource.state_key_hash.clone(), guid_creation_num: BigDecimal::default(), <------- allow_ungated_transfer: false, <------- is_deleted: true, untransferrable: false, <------- block_timestamp: chrono::NaiveDateTime::default(), <------- ``` these fields are the ones from the prev owner ## Test Plan - Compared the number of rows from DE teams BigQuery table, testing BigQuery table, legacy processor table, and sdk processor table ### objects ![Screenshot 2024-12-11 at 9 57 15 PM](https://github.com/user-attachments/assets/75943a99-2b80-4916-b54b-486d6d9b7f0f) ![Screenshot 2024-12-11 at 10 02 36 PM](https://github.com/user-attachments/assets/ee73187c-a99d-4560-b460-0ab0b0efa165) ### current objects ![Screenshot 2024-12-11 at 9 57 26 PM](https://github.com/user-attachments/assets/7aac4d67-4c19-400d-8254-bc17b619ea27) ![Screenshot 2024-12-11 at 10 05 24 PM](https://github.com/user-attachments/assets/f9e2184f-cffc-4e89-99d6-c21371c48d79) --- .../models/object_models/raw_v2_objects.rs | 140 ++++++---- .../raw_v2_token_ownerships.rs | 32 +-- rust/processor/src/db/parquet/models/mod.rs | 1 + .../models/object_models/v2_objects.rs | 103 +++++-- .../src/processors/objects_processor.rs | 252 ++++++++++-------- .../parquet_token_v2_processor.rs | 16 +- .../src/processors/token_v2_processor.rs | 35 ++- rust/processor/src/utils/database.rs | 6 + .../src/config/indexer_processor_config.rs | 5 + .../src/config/processor_config.rs | 7 + .../src/parquet_processors/mod.rs | 21 +- .../parquet_objects_processor.rs | 176 ++++++++++++ .../src/steps/common/gcs_uploader.rs | 2 +- rust/sdk-processor/src/steps/mod.rs | 1 + .../objects_processor/objects_extractor.rs | 134 ++-------- .../steps/parquet_objects_processor/mod.rs | 1 + .../parquet_objects_extractor.rs | 85 ++++++ .../parquet_token_v2_extractor.rs | 2 +- .../token_v2_processor/token_v2_extractor.rs | 10 +- 19 files changed, 669 insertions(+), 360 deletions(-) create mode 100644 rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs create mode 100644 rust/sdk-processor/src/steps/parquet_objects_processor/mod.rs create mode 100644 rust/sdk-processor/src/steps/parquet_objects_processor/parquet_objects_extractor.rs diff --git a/rust/processor/src/db/common/models/object_models/raw_v2_objects.rs b/rust/processor/src/db/common/models/object_models/raw_v2_objects.rs index a80a504c7..d34712a66 100644 --- a/rust/processor/src/db/common/models/object_models/raw_v2_objects.rs +++ b/rust/processor/src/db/common/models/object_models/raw_v2_objects.rs @@ -9,7 +9,10 @@ use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping}; use crate::{ db::postgres::models::default_models::move_resources::MoveResource, schema::current_objects, - utils::{database::DbPoolConnection, util::standardize_address}, + utils::{ + database::{DbContext, DbPoolConnection}, + util::standardize_address, + }, }; use ahash::AHashMap; use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; @@ -19,6 +22,8 @@ use diesel_async::RunQueryDsl; use field_count::FieldCount; use serde::{Deserialize, Serialize}; +const DELETED_RESOURCE_OWNER_ADDRESS: &str = "Unknown"; + #[derive(Clone, Debug, Deserialize, FieldCount, Serialize)] pub struct RawObject { pub transaction_version: i64, @@ -30,6 +35,7 @@ pub struct RawObject { pub allow_ungated_transfer: bool, pub is_deleted: bool, pub untransferrable: bool, + pub block_timestamp: chrono::NaiveDateTime, } pub trait ObjectConvertible { @@ -46,6 +52,7 @@ pub struct RawCurrentObject { pub last_transaction_version: i64, pub is_deleted: bool, pub untransferrable: bool, + pub block_timestamp: chrono::NaiveDateTime, } pub trait CurrentObjectConvertible { @@ -73,14 +80,10 @@ impl RawObject { txn_version: i64, write_set_change_index: i64, object_metadata_mapping: &ObjectAggregatedDataMapping, + block_timestamp: chrono::NaiveDateTime, ) -> anyhow::Result> { let address = standardize_address(&write_resource.address.to_string()); - println!("address: {:?}", address); if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) { - println!( - "object_aggregated_metadata: {:?}", - object_aggregated_metadata - ); // do something let object_with_metadata = object_aggregated_metadata.object.clone(); let object_core = object_with_metadata.object_core; @@ -101,6 +104,7 @@ impl RawObject { allow_ungated_transfer: object_core.allow_ungated_transfer, is_deleted: false, untransferrable, + block_timestamp, }, RawCurrentObject { object_address: address, @@ -111,6 +115,7 @@ impl RawObject { last_transaction_version: txn_version, is_deleted: false, untransferrable, + block_timestamp, }, ))) } else { @@ -126,9 +131,8 @@ impl RawObject { txn_version: i64, write_set_change_index: i64, object_mapping: &AHashMap, - conn: &mut DbPoolConnection<'_>, - query_retries: u32, - query_retry_delay_ms: u64, + db_context: &mut Option>, + block_timestamp: chrono::NaiveDateTime, ) -> anyhow::Result> { if delete_resource.type_str == "0x1::object::ObjectGroup" { let resource = MoveResource::from_delete_resource( @@ -137,52 +141,91 @@ impl RawObject { txn_version, 0, // Placeholder, this isn't used anyway ); - let previous_object = if let Some(object) = object_mapping.get(&resource.address) { - object.clone() + // Add a logid here to handle None conn + if db_context.is_none() { + // This is a hack to prevent the program for parquet + Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + object_address: resource.address.clone(), + owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(), + state_key_hash: resource.state_key_hash.clone(), + guid_creation_num: BigDecimal::default(), + allow_ungated_transfer: false, + is_deleted: true, + untransferrable: false, + block_timestamp, + }, + RawCurrentObject { + object_address: resource.address.clone(), + owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(), + state_key_hash: resource.state_key_hash.clone(), + last_guid_creation_num: BigDecimal::default(), + allow_ungated_transfer: false, + last_transaction_version: txn_version, + is_deleted: true, + untransferrable: false, + block_timestamp, + }, + ))) } else { - match Self::get_current_object( - conn, - &resource.address, - query_retries, - query_retry_delay_ms, - ) - .await - { - Ok(object) => object, - Err(_) => { - tracing::error!( + let previous_object = if let Some(object) = object_mapping.get(&resource.address) { + object.clone() + } else if let Some(db_context) = db_context { + match Self::get_current_object( + &mut db_context.conn, + &resource.address, + db_context.query_retries, + db_context.query_retry_delay_ms, + ) + .await + { + Ok(object) => object, + Err(_) => { + tracing::error!( transaction_version = txn_version, lookup_key = &resource.address, "Missing current_object for object_address: {}. You probably should backfill db.", resource.address, ); - return Ok(None); + return Ok(None); + }, + } + } else { + tracing::error!( + transaction_version = txn_version, + lookup_key = &resource.address, + "Connection to DB is missing. You may need to investigate", + ); + return Ok(None); + }; + Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + object_address: resource.address.clone(), + owner_address: previous_object.owner_address.clone(), + state_key_hash: resource.state_key_hash.clone(), + guid_creation_num: previous_object.last_guid_creation_num.clone(), + allow_ungated_transfer: previous_object.allow_ungated_transfer, + is_deleted: true, + untransferrable: previous_object.untransferrable, + block_timestamp, }, - } - }; - Ok(Some(( - Self { - transaction_version: txn_version, - write_set_change_index, - object_address: resource.address.clone(), - owner_address: previous_object.owner_address.clone(), - state_key_hash: resource.state_key_hash.clone(), - guid_creation_num: previous_object.last_guid_creation_num.clone(), - allow_ungated_transfer: previous_object.allow_ungated_transfer, - is_deleted: true, - untransferrable: previous_object.untransferrable, - }, - RawCurrentObject { - object_address: resource.address, - owner_address: previous_object.owner_address.clone(), - state_key_hash: resource.state_key_hash, - last_guid_creation_num: previous_object.last_guid_creation_num.clone(), - allow_ungated_transfer: previous_object.allow_ungated_transfer, - last_transaction_version: txn_version, - is_deleted: true, - untransferrable: previous_object.untransferrable, - }, - ))) + RawCurrentObject { + object_address: resource.address, + owner_address: previous_object.owner_address.clone(), + state_key_hash: resource.state_key_hash, + last_guid_creation_num: previous_object.last_guid_creation_num.clone(), + allow_ungated_transfer: previous_object.allow_ungated_transfer, + last_transaction_version: txn_version, + is_deleted: true, + untransferrable: previous_object.untransferrable, + block_timestamp, + }, + ))) + } } else { Ok(None) } @@ -210,6 +253,7 @@ impl RawObject { last_transaction_version: res.last_transaction_version, is_deleted: res.is_deleted, untransferrable: res.untransferrable, + block_timestamp: chrono::NaiveDateTime::default(), // this won't be used }); }, Err(_) => { diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs index f266fc457..b6ded57da 100644 --- a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs @@ -21,7 +21,7 @@ use crate::{ }, schema::current_token_ownerships_v2, utils::{ - database::DbPoolConnection, + database::{DbContext, DbPoolConnection}, util::{ensure_not_negative, standardize_address}, }, }; @@ -259,9 +259,7 @@ impl RawTokenOwnershipV2 { prior_nft_ownership: &AHashMap, tokens_burned: &TokenV2Burned, object_metadatas: &ObjectAggregatedDataMapping, - conn: &mut Option>, - query_retries: u32, - query_retry_delay_ms: u64, + db_context: &mut Option>, ) -> anyhow::Result> { let token_data_id = standardize_address(&write_resource.address.to_string()); if tokens_burned @@ -328,9 +326,7 @@ impl RawTokenOwnershipV2 { txn_timestamp, prior_nft_ownership, tokens_burned, - conn, - query_retries, - query_retry_delay_ms, + db_context, ) .await; } @@ -346,9 +342,7 @@ impl RawTokenOwnershipV2 { txn_timestamp: chrono::NaiveDateTime, prior_nft_ownership: &AHashMap, tokens_burned: &TokenV2Burned, - conn: &mut Option>, - query_retries: u32, - query_retry_delay_ms: u64, + db_context: &mut Option>, ) -> anyhow::Result> { let token_address = standardize_address(&delete_resource.address.to_string()); Self::get_burned_nft_v2_helper( @@ -358,9 +352,7 @@ impl RawTokenOwnershipV2 { txn_timestamp, prior_nft_ownership, tokens_burned, - conn, - query_retries, - query_retry_delay_ms, + db_context, ) .await } @@ -372,9 +364,7 @@ impl RawTokenOwnershipV2 { txn_timestamp: chrono::NaiveDateTime, prior_nft_ownership: &AHashMap, tokens_burned: &TokenV2Burned, - conn: &mut Option>, - query_retries: u32, - query_retry_delay_ms: u64, + db_context: &mut Option>, ) -> anyhow::Result> { let token_address = standardize_address(token_address); if let Some(burn_event) = tokens_burned.get(&token_address) { @@ -389,7 +379,7 @@ impl RawTokenOwnershipV2 { match prior_nft_ownership.get(&token_address) { Some(inner) => inner.owner_address.clone(), None => { - match conn { + match db_context { None => { // TODO: update message tracing::error!( @@ -399,12 +389,12 @@ impl RawTokenOwnershipV2 { ); DEFAULT_OWNER_ADDRESS.to_string() }, - Some(ref mut conn) => { + Some(db_context) => { match CurrentTokenOwnershipV2Query::get_latest_owned_nft_by_token_data_id( - conn, + &mut db_context.conn, &token_address, - query_retries, - query_retry_delay_ms, + db_context.query_retries, + db_context.query_retry_delay_ms, ) .await { diff --git a/rust/processor/src/db/parquet/models/mod.rs b/rust/processor/src/db/parquet/models/mod.rs index de9402f4f..fb9742f2d 100644 --- a/rust/processor/src/db/parquet/models/mod.rs +++ b/rust/processor/src/db/parquet/models/mod.rs @@ -3,6 +3,7 @@ pub mod ans_models; pub mod default_models; pub mod event_models; pub mod fungible_asset_models; +pub mod object_models; pub mod token_v2_models; pub mod transaction_metadata_model; pub mod user_transaction_models; diff --git a/rust/processor/src/db/parquet/models/object_models/v2_objects.rs b/rust/processor/src/db/parquet/models/object_models/v2_objects.rs index 6587eac13..589713a4f 100644 --- a/rust/processor/src/db/parquet/models/object_models/v2_objects.rs +++ b/rust/processor/src/db/parquet/models/object_models/v2_objects.rs @@ -5,49 +5,112 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping}; use crate::{ - db::postgres::models::default_models::move_resources::MoveResource, - schema::{current_objects, objects}, - utils::{database::DbPoolConnection, util::standardize_address}, + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::object_models::raw_v2_objects::{ + CurrentObjectConvertible, ObjectConvertible, RawCurrentObject, RawObject, + }, }; -use ahash::AHashMap; -use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; -use bigdecimal::BigDecimal; -use diesel::prelude::*; -use diesel_async::RunQueryDsl; +use allocative_derive::Allocative; +use bigdecimal::ToPrimitive; use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] -pub struct RawObject { - pub transaction_version: i64, +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct Object { + pub txn_version: i64, pub write_set_change_index: i64, pub object_address: String, pub owner_address: String, pub state_key_hash: String, - pub guid_creation_num: BigDecimal, + pub guid_creation_num: u64, // BigDecimal, pub allow_ungated_transfer: bool, pub is_deleted: bool, pub untransferrable: bool, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, } -pub trait ObjectConvertible { - fn from_raw(raw_item: RawObject) -> Self; +impl NamedTable for Object { + const TABLE_NAME: &'static str = "objects"; } -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] -pub struct RawCurrentObject { +impl HasVersion for Object { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for Object { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl ObjectConvertible for Object { + fn from_raw(raw_item: RawObject) -> Self { + Self { + txn_version: raw_item.transaction_version, + write_set_change_index: raw_item.write_set_change_index, + object_address: raw_item.object_address, + owner_address: raw_item.owner_address, + state_key_hash: raw_item.state_key_hash, + guid_creation_num: raw_item.guid_creation_num.to_u64().unwrap(), + allow_ungated_transfer: raw_item.allow_ungated_transfer, + is_deleted: raw_item.is_deleted, + untransferrable: raw_item.untransferrable, + block_timestamp: raw_item.block_timestamp, + } + } +} + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct CurrentObject { pub object_address: String, pub owner_address: String, pub state_key_hash: String, pub allow_ungated_transfer: bool, - pub last_guid_creation_num: BigDecimal, + pub last_guid_creation_num: u64, // BigDecimal, pub last_transaction_version: i64, pub is_deleted: bool, pub untransferrable: bool, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for CurrentObject { + const TABLE_NAME: &'static str = "objects"; +} + +impl HasVersion for CurrentObject { + fn version(&self) -> i64 { + self.last_transaction_version + } +} + +impl GetTimeStamp for CurrentObject { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } } -pub trait CurrentObjectConvertible { - fn from_raw(raw_item: RawCurrentObject) -> Self; +impl CurrentObjectConvertible for CurrentObject { + fn from_raw(raw_item: RawCurrentObject) -> Self { + Self { + object_address: raw_item.object_address, + owner_address: raw_item.owner_address, + state_key_hash: raw_item.state_key_hash, + allow_ungated_transfer: raw_item.allow_ungated_transfer, + last_guid_creation_num: raw_item.last_guid_creation_num.to_u64().unwrap(), + last_transaction_version: raw_item.last_transaction_version, + is_deleted: raw_item.is_deleted, + untransferrable: raw_item.untransferrable, + block_timestamp: raw_item.block_timestamp, + } + } } diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs index 79d3e87eb..edce3afe8 100644 --- a/rust/processor/src/processors/objects_processor.rs +++ b/rust/processor/src/processors/objects_processor.rs @@ -21,9 +21,9 @@ use crate::{ gap_detectors::ProcessingResult, schema, utils::{ - database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool, DbContext}, table_flags::TableFlags, - util::standardize_address, + util::{parse_timestamp, standardize_address}, }, IndexerGrpcProcessorConfig, }; @@ -177,129 +177,26 @@ impl ProcessorTrait for ObjectsProcessor { ) -> anyhow::Result { let processing_start: std::time::Instant = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - - let mut conn = self.get_conn().await; + let conn = self.get_conn().await; let query_retries = self.config.query_retries; let query_retry_delay_ms = self.config.query_retry_delay_ms; - // Moving object handling here because we need a single object - // map through transactions for lookups - let mut all_objects = vec![]; - let mut all_current_objects = AHashMap::new(); - let mut object_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new(); + let db_connection = DbContext { + conn, + query_retries, + query_retry_delay_ms, + }; - for txn in &transactions { - let txn_version = txn.version as i64; - let changes = &txn - .info - .as_ref() - .unwrap_or_else(|| { - panic!( - "Transaction info doesn't exist! Transaction {}", - txn_version - ) - }) - .changes; - // First pass to get all the object cores - for wsc in changes.iter() { - if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { - let address = standardize_address(&wr.address.to_string()); - if let Some(object_with_metadata) = - ObjectWithMetadata::from_write_resource(wr).unwrap() - { - // Object core is the first struct that we need to get - object_metadata_helper.insert(address.clone(), ObjectAggregatedData { - object: object_with_metadata, - token: None, - fungible_asset_store: None, - // The following structs are unused in this processor - fungible_asset_metadata: None, - aptos_collection: None, - fixed_supply: None, - unlimited_supply: None, - concurrent_supply: None, - property_map: None, - transfer_events: vec![], - untransferable: None, - fungible_asset_supply: None, - concurrent_fungible_asset_supply: None, - concurrent_fungible_asset_balance: None, - token_identifier: None, - }); - } - } - } - - // Second pass to get object metadata - for wsc in changes.iter() { - if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { - let address = standardize_address(&write_resource.address.to_string()); - if let Some(aggregated_data) = object_metadata_helper.get_mut(&address) { - if let Some(untransferable) = - Untransferable::from_write_resource(write_resource).unwrap() - { - aggregated_data.untransferable = Some(untransferable); - } - } - } - } - - // Second pass to construct the object data - for (index, wsc) in changes.iter().enumerate() { - let index: i64 = index as i64; - match wsc.change.as_ref().unwrap() { - Change::WriteResource(inner) => { - println!("index: {:?}", index); - if let Some((object, current_object)) = &RawObject::from_write_resource( - inner, - txn_version, - index, - &object_metadata_helper, - ) - .unwrap() - { - all_objects.push(object.clone()); - all_current_objects - .insert(object.object_address.clone(), current_object.clone()); - } - }, - Change::DeleteResource(inner) => { - // Passing all_current_objects into the function so that we can get the owner of the deleted - // resource if it was handled in the same batch - if let Some((object, current_object)) = RawObject::from_delete_resource( - inner, - txn_version, - index, - &all_current_objects, - &mut conn, - query_retries, - query_retry_delay_ms, - ) - .await - .unwrap() - { - all_objects.push(object.clone()); - all_current_objects - .insert(object.object_address.clone(), current_object.clone()); - } - }, - _ => {}, - }; - } - } - - // Sort by PK - let mut all_current_objects = all_current_objects - .into_values() - .collect::>(); - all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); + let (mut raw_all_objects, raw_all_current_objects) = + process_objects(transactions, &mut Some(db_connection)).await; if self.deprecated_tables.contains(TableFlags::OBJECTS) { - all_objects.clear(); + raw_all_objects.clear(); } - let postgres_objects: Vec = all_objects.into_iter().map(Object::from_raw).collect(); - let postgres_current_objects: Vec = all_current_objects + let postgres_objects: Vec = + raw_all_objects.into_iter().map(Object::from_raw).collect(); + let postgres_current_objects: Vec = raw_all_current_objects .into_iter() .map(CurrentObject::from_raw) .collect(); @@ -345,3 +242,124 @@ impl ProcessorTrait for ObjectsProcessor { &self.connection_pool } } + +pub async fn process_objects( + transactions: Vec, + db_context: &mut Option>, +) -> (Vec, Vec) { + // Moving object handling here because we need a single object + // map through transactions for lookups + let mut all_objects = vec![]; + let mut all_current_objects = AHashMap::new(); + let mut object_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new(); + + for txn in &transactions { + let txn_version = txn.version as i64; + let changes = &txn + .info + .as_ref() + .unwrap_or_else(|| { + panic!( + "Transaction info doesn't exist! Transaction {}", + txn_version + ) + }) + .changes; + + let txn_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); + + // First pass to get all the object cores + for wsc in changes.iter() { + if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { + let address = standardize_address(&wr.address.to_string()); + if let Some(object_with_metadata) = + ObjectWithMetadata::from_write_resource(wr).unwrap() + { + // Object core is the first struct that we need to get + object_metadata_helper.insert(address.clone(), ObjectAggregatedData { + object: object_with_metadata, + token: None, + fungible_asset_store: None, + // The following structs are unused in this processor + fungible_asset_metadata: None, + aptos_collection: None, + fixed_supply: None, + unlimited_supply: None, + concurrent_supply: None, + property_map: None, + transfer_events: vec![], + untransferable: None, + fungible_asset_supply: None, + concurrent_fungible_asset_supply: None, + concurrent_fungible_asset_balance: None, + token_identifier: None, + }); + } + } + } + + // Second pass to get object metadata + for wsc in changes.iter() { + if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { + let address = standardize_address(&write_resource.address.to_string()); + if let Some(aggregated_data) = object_metadata_helper.get_mut(&address) { + if let Some(untransferable) = + Untransferable::from_write_resource(write_resource).unwrap() + { + aggregated_data.untransferable = Some(untransferable); + } + } + } + } + + // Second pass to construct the object data + for (index, wsc) in changes.iter().enumerate() { + let index: i64 = index as i64; + match wsc.change.as_ref().unwrap() { + Change::WriteResource(inner) => { + if let Some((object, current_object)) = &RawObject::from_write_resource( + inner, + txn_version, + index, + &object_metadata_helper, + txn_timestamp, + ) + .unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + Change::DeleteResource(inner) => { + // Passing all_current_objects into the function so that we can get the owner of the deleted + // resource if it was handled in the same batch + if let Some((object, current_object)) = RawObject::from_delete_resource( + inner, + txn_version, + index, + &all_current_objects, + db_context, + txn_timestamp, + ) + .await + .unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + _ => {}, + }; + } + } + + // Sort by PK + let mut all_current_objects = all_current_objects + .into_values() + .collect::>(); + all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); + + (all_objects, all_current_objects) +} diff --git a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs index 6e4ece293..2f07cac1a 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs @@ -35,7 +35,7 @@ use crate::{ processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait}, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{ArcDbPool, DbPoolConnection}, + database::{ArcDbPool, DbContext}, util::{parse_timestamp, standardize_address}, }, }; @@ -139,8 +139,6 @@ impl ProcessorTrait for ParquetTokenV2Processor { &transactions, &table_handle_to_owner, &mut None, - 0, - 0, &mut transaction_version_to_struct_count, ) .await; @@ -193,9 +191,7 @@ impl ProcessorTrait for ParquetTokenV2Processor { async fn parse_v2_token( transactions: &[Transaction], table_handle_to_owner: &TableHandleToOwner, - conn: &mut Option>, - query_retries: u32, - query_retry_delay_ms: u64, + db_context: &mut Option>, transaction_version_to_struct_count: &mut AHashMap, ) -> (Vec, Vec) { // Token V2 and V1 combined @@ -459,9 +455,7 @@ async fn parse_v2_token( &prior_nft_ownership, &tokens_burned, &token_v2_metadata_helper, - conn, - query_retries, - query_retry_delay_ms, + db_context, ) .await .unwrap() @@ -490,9 +484,7 @@ async fn parse_v2_token( txn_timestamp, &prior_nft_ownership, &tokens_burned, - conn, - query_retries, - query_retry_delay_ms, + db_context, ) .await .unwrap() diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index be9faa00d..95c9af29a 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -54,7 +54,7 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool, DbPoolConnection}, + database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool, DbContext}, table_flags::TableFlags, util::{get_entry_function_from_user_request, parse_timestamp, standardize_address}, }, @@ -620,8 +620,11 @@ impl ProcessorTrait for TokenV2Processor { let table_handle_to_owner = TableMetadataForToken::get_table_handle_to_owner_from_transactions(&transactions); - let query_retries = self.config.query_retries; - let query_retry_delay_ms = self.config.query_retry_delay_ms; + let db_connection = DbContext { + conn, + query_retries: self.config.query_retries, + query_retry_delay_ms: self.config.query_retry_delay_ms, + }; // Token V2 processing which includes token v1 let ( mut collections_v2, @@ -639,9 +642,7 @@ impl ProcessorTrait for TokenV2Processor { ) = parse_v2_token( &transactions, &table_handle_to_owner, - &mut Some(conn), - query_retries, - query_retry_delay_ms, + &mut Some(db_connection), ) .await; @@ -793,15 +794,13 @@ pub async fn parse_v2_token_for_parquet( Vec, Vec, ) { - parse_v2_token(transactions, table_handle_to_owner, &mut None, 0, 0).await + parse_v2_token(transactions, table_handle_to_owner, &mut None).await } pub async fn parse_v2_token( transactions: &[Transaction], table_handle_to_owner: &TableHandleToOwner, - conn: &mut Option>, - query_retries: u32, - query_retry_delay_ms: u64, + db_context: &mut Option>, ) -> ( Vec, Vec, @@ -1025,7 +1024,7 @@ pub async fn parse_v2_token( Change::WriteTableItem(table_item) => { // TODO: revisit when we migrate collection_v2 for parquet // for not it will be only handled for postgres - if let Some(ref mut conn) = conn { + if let Some(ref mut db_context) = db_context { if let Some((collection, current_collection)) = CollectionV2::get_v1_from_write_table_item( table_item, @@ -1033,9 +1032,9 @@ pub async fn parse_v2_token( wsc_index, txn_timestamp, table_handle_to_owner, - conn, - query_retries, - query_retry_delay_ms, + &mut db_context.conn, + db_context.query_retries, + db_context.query_retry_delay_ms, ) .await .unwrap() @@ -1268,9 +1267,7 @@ pub async fn parse_v2_token( &prior_nft_ownership, &tokens_burned, &token_v2_metadata_helper, - conn, - query_retries, - query_retry_delay_ms, + db_context, ) .await .unwrap() @@ -1339,9 +1336,7 @@ pub async fn parse_v2_token( txn_timestamp, &prior_nft_ownership, &tokens_burned, - conn, - query_retries, - query_retry_delay_ms, + db_context, ) .await .unwrap() diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index ad6a280cb..da3383059 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -284,3 +284,9 @@ where Ok(()) } } + +pub struct DbContext<'a> { + pub conn: DbPoolConnection<'a>, + pub query_retries: u32, + pub query_retry_delay_ms: u64, +} diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 932efbc47..fc8a20916 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -9,6 +9,7 @@ use crate::{ parquet_default_processor::ParquetDefaultProcessor, parquet_events_processor::ParquetEventsProcessor, parquet_fungible_asset_processor::ParquetFungibleAssetProcessor, + parquet_objects_processor::ParquetObjectsProcessor, parquet_token_v2_processor::ParquetTokenV2Processor, parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor, parquet_user_transaction_processor::ParquetUserTransactionsProcessor, @@ -122,6 +123,10 @@ impl RunnableConfig for IndexerProcessorConfig { let parquet_ans_processor = ParquetAnsProcessor::new(self.clone()).await?; parquet_ans_processor.run_processor().await }, + ProcessorConfig::ParquetObjectsProcessor(_) => { + let parquet_objects_processor = ParquetObjectsProcessor::new(self.clone()).await?; + parquet_objects_processor.run_processor().await + }, } } diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 670dd02ea..ffbabfcc1 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -32,6 +32,7 @@ use processor::{ }, parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, + object_models::v2_objects::{CurrentObject, Object}, token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, @@ -100,6 +101,7 @@ pub enum ProcessorConfig { ParquetTransactionMetadataProcessor(ParquetDefaultProcessorConfig), ParquetAccountTransactionsProcessor(ParquetDefaultProcessorConfig), ParquetTokenV2Processor(ParquetDefaultProcessorConfig), + ParquetObjectsProcessor(ParquetDefaultProcessorConfig), } impl ProcessorConfig { @@ -121,6 +123,7 @@ impl ProcessorConfig { | ProcessorConfig::ParquetTransactionMetadataProcessor(config) | ProcessorConfig::ParquetAccountTransactionsProcessor(config) | ProcessorConfig::ParquetTokenV2Processor(config) + | ProcessorConfig::ParquetObjectsProcessor(config) | ProcessorConfig::ParquetFungibleAssetProcessor(config) => config, ProcessorConfig::ParquetAnsProcessor(config) => &config.default, _ => { @@ -198,6 +201,10 @@ impl ProcessorConfig { TokenOwnershipV2::TABLE_NAME.to_string(), CurrentTokenOwnershipV2::TABLE_NAME.to_string(), ]), + ProcessorName::ParquetObjectsProcessor => HashSet::from([ + Object::TABLE_NAME.to_string(), + CurrentObject::TABLE_NAME.to_string(), + ]), _ => HashSet::new(), // Default case for unsupported processors } } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index aab7ab7e8..b3028c09a 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -36,6 +36,7 @@ use processor::{ }, parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, + object_models::v2_objects::{CurrentObject, Object}, token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, @@ -63,6 +64,7 @@ pub mod parquet_ans_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; +pub mod parquet_objects_processor; pub mod parquet_token_v2_processor; pub mod parquet_transaction_metadata_processor; pub mod parquet_user_transaction_processor; @@ -129,6 +131,9 @@ pub enum ParquetTypeEnum { CurrentTokenDatasV2, TokenOwnershipsV2, CurrentTokenOwnershipsV2, + // Objects + Objects, + CurrentObjects, } /// Trait for handling various Parquet types. @@ -233,7 +238,8 @@ impl_parquet_trait!( CurrentTokenOwnershipV2, ParquetTypeEnum::CurrentTokenOwnershipsV2 ); - +impl_parquet_trait!(Object, ParquetTypeEnum::Objects); +impl_parquet_trait!(CurrentObject, ParquetTypeEnum::CurrentObjects); #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] pub enum ParquetTypeStructs { @@ -274,6 +280,8 @@ pub enum ParquetTypeStructs { CurrentTokenDataV2(Vec), TokenOwnershipV2(Vec), CurrentTokenOwnershipV2(Vec), + Object(Vec), + CurrentObject(Vec), } impl ParquetTypeStructs { @@ -336,6 +344,8 @@ impl ParquetTypeStructs { ParquetTypeEnum::CurrentTokenOwnershipsV2 => { ParquetTypeStructs::CurrentTokenOwnershipV2(Vec::new()) }, + ParquetTypeEnum::Objects => ParquetTypeStructs::Object(Vec::new()), + ParquetTypeEnum::CurrentObjects => ParquetTypeStructs::CurrentObject(Vec::new()), } } @@ -521,6 +531,15 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + (ParquetTypeStructs::Object(self_data), ParquetTypeStructs::Object(other_data)) => { + handle_append!(self_data, other_data) + }, + ( + ParquetTypeStructs::CurrentObject(self_data), + ParquetTypeStructs::CurrentObject(other_data), + ) => { + handle_append!(self_data, other_data) + }, _ => Err(ProcessorError::ProcessError { message: "Mismatched buffer types in append operation".to_string(), }), diff --git a/rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs new file mode 100644 index 000000000..f91c23920 --- /dev/null +++ b/rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs @@ -0,0 +1,176 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + set_backfill_table_flag, ParquetTypeEnum, + }, + steps::{ + common::{ + parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_processor_status_saver, + }, + parquet_objects_processor::parquet_objects_extractor::ParquetObjectsExtractor, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, + }, +}; +use anyhow::Context; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::parquet::models::object_models::v2_objects::{CurrentObject, Object}, +}; +use std::{collections::HashMap, sync::Arc}; +use tracing::{debug, info}; + +pub struct ParquetObjectsProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, // for processor status +} + +impl ParquetObjectsProcessor { + pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) + } +} +#[async_trait::async_trait] +impl ProcessorTrait for ParquetObjectsProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> anyhow::Result<()> { + // Run Migrations + let parquet_db_config = match self.config.db_config { + DbConfig::ParquetConfig(ref parquet_config) => { + run_migrations( + parquet_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + parquet_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid db config for ParquetObjectsProcessor {:?}", + self.config.db_config + )); + }, + }; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let parquet_processor_config = match self.config.processor_config.clone() { + ProcessorConfig::ParquetObjectsProcessor(parquet_processor_config) => { + parquet_processor_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor configuration for ParquetObjectsProcessor {:?}", + self.config.processor_config + )); + }, + }; + + let processor_status_table_names = self + .config + .processor_config + .get_processor_status_table_names() + .context("Failed to get table names for the processor status table")?; + + let starting_version = get_min_last_success_version_parquet( + &self.config, + self.db_pool.clone(), + processor_status_table_names, + ) + .await?; + + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table); + let parquet_objects_extractor = ParquetObjectsExtractor { + opt_in_tables: backfill_table, + }; + + let gcs_client = + initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await; + + let parquet_type_to_schemas: HashMap> = [ + (ParquetTypeEnum::Objects, Object::schema()), + (ParquetTypeEnum::CurrentObjects, CurrentObject::schema()), + ] + .into_iter() + .collect(); + + let default_size_buffer_step = initialize_parquet_buffer_step( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_processor_config.upload_interval, + parquet_processor_config.max_buffer_size, + parquet_db_config.bucket_name.clone(), + parquet_db_config.bucket_root.clone(), + self.name().to_string(), + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + + let channel_size = parquet_processor_config.channel_size; + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(parquet_objects_extractor.into_runnable_step(), channel_size) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs index 1dc3575ba..074f32957 100644 --- a/rust/sdk-processor/src/steps/common/gcs_uploader.rs +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -127,7 +127,7 @@ impl GCSUploader { for<'a> &'a [ParquetType]: RecordWriter, { if data.is_empty() { - debug!("Buffer is empty, skipping upload."); + println!("Buffer is empty, skipping upload."); return Ok(()); } diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 643eb58c3..de856c9f5 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -15,6 +15,7 @@ pub mod parquet_ans_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; +pub mod parquet_objects_processor; pub mod parquet_token_v2_processor; pub mod parquet_transaction_metadata_processor; pub mod parquet_user_transaction_processor; diff --git a/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs b/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs index 92ce006a9..519681f36 100644 --- a/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs +++ b/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs @@ -1,28 +1,20 @@ use crate::utils::database::ArcDbPool; -use ahash::AHashMap; use aptos_indexer_processor_sdk::{ - aptos_protos::transaction::v1::{write_set_change::Change, Transaction}, + aptos_protos::transaction::v1::Transaction, traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, types::transaction_context::TransactionContext, - utils::{convert::standardize_address, errors::ProcessorError}, + utils::errors::ProcessorError, }; use async_trait::async_trait; use processor::{ db::{ - common::models::object_models::{ - raw_v2_objects::{ - CurrentObjectConvertible, ObjectConvertible, RawCurrentObject, RawObject, - }, - v2_object_utils::{ - ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, - }, - }, - postgres::models::{ - object_models::v2_objects::{CurrentObject, Object}, - resources::FromWriteResource, + common::models::object_models::raw_v2_objects::{ + CurrentObjectConvertible, ObjectConvertible, }, + postgres::models::object_models::v2_objects::{CurrentObject, Object}, }, - utils::table_flags::TableFlags, + processors::objects_processor::process_objects, + utils::{database::DbContext, table_flags::TableFlags}, }; /// Extracts fungible asset events, metadata, balances, and v1 supply from transactions @@ -62,7 +54,7 @@ impl Processable for ObjectsExtractor { &mut self, transactions: TransactionContext>, ) -> Result, Vec)>>, ProcessorError> { - let mut conn = self + let conn = self .conn_pool .get() .await @@ -72,113 +64,23 @@ impl Processable for ObjectsExtractor { })?; let query_retries = self.query_retries; let query_retry_delay_ms = self.query_retry_delay_ms; + let db_connection = DbContext { + conn, + query_retries, + query_retry_delay_ms, + }; - // Moving object handling here because we need a single object - // map through transactions for lookups - let mut all_objects = vec![]; - let mut all_current_objects = AHashMap::new(); - let mut object_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new(); - - for txn in &transactions.data { - let txn_version = txn.version as i64; - let changes = &txn - .info - .as_ref() - .unwrap_or_else(|| { - panic!( - "Transaction info doesn't exist! Transaction {}", - txn_version - ) - }) - .changes; - - // First pass to get all the object cores - for wsc in changes.iter() { - if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { - let address = standardize_address(&wr.address.to_string()); - if let Some(object_with_metadata) = - ObjectWithMetadata::from_write_resource(wr).unwrap() - { - // Object core is the first struct that we need to get - object_metadata_helper.insert(address.clone(), ObjectAggregatedData { - object: object_with_metadata, - token: None, - fungible_asset_store: None, - // The following structs are unused in this processor - fungible_asset_metadata: None, - aptos_collection: None, - fixed_supply: None, - unlimited_supply: None, - concurrent_supply: None, - property_map: None, - transfer_events: vec![], - untransferable: None, - fungible_asset_supply: None, - concurrent_fungible_asset_supply: None, - concurrent_fungible_asset_balance: None, - token_identifier: None, - }); - } - } - } - - // Second pass to construct the object data - for (index, wsc) in changes.iter().enumerate() { - let index: i64 = index as i64; - match wsc.change.as_ref().unwrap() { - Change::WriteResource(inner) => { - if let Some((object, current_object)) = &RawObject::from_write_resource( - inner, - txn_version, - index, - &object_metadata_helper, - ) - .unwrap() - { - all_objects.push(object.clone()); - all_current_objects - .insert(object.object_address.clone(), current_object.clone()); - } - }, - Change::DeleteResource(inner) => { - // Passing all_current_objects into the function so that we can get the owner of the deleted - // resource if it was handled in the same batch - if let Some((object, current_object)) = RawObject::from_delete_resource( - inner, - txn_version, - index, - &all_current_objects, - &mut conn, - query_retries, - query_retry_delay_ms, - ) - .await - .unwrap() - { - all_objects.push(object.clone()); - all_current_objects - .insert(object.object_address.clone(), current_object.clone()); - } - }, - _ => {}, - }; - } - } - - // Sort by PK - let mut all_current_objects = all_current_objects - .into_values() - .collect::>(); - all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); + let (mut raw_all_objects, raw_all_current_objects) = + process_objects(transactions.data, &mut Some(db_connection)).await; if self.deprecated_tables.contains(TableFlags::OBJECTS) { - all_objects.clear(); + raw_all_objects.clear(); } let postgres_all_objects: Vec = - all_objects.into_iter().map(Object::from_raw).collect(); + raw_all_objects.into_iter().map(Object::from_raw).collect(); - let postgres_all_current_objects: Vec = all_current_objects + let postgres_all_current_objects: Vec = raw_all_current_objects .into_iter() .map(CurrentObject::from_raw) .collect(); diff --git a/rust/sdk-processor/src/steps/parquet_objects_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_objects_processor/mod.rs new file mode 100644 index 000000000..7dfc9a12d --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_objects_processor/mod.rs @@ -0,0 +1 @@ +pub mod parquet_objects_extractor; diff --git a/rust/sdk-processor/src/steps/parquet_objects_processor/parquet_objects_extractor.rs b/rust/sdk-processor/src/steps/parquet_objects_processor/parquet_objects_extractor.rs new file mode 100644 index 000000000..9fd53db78 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_objects_processor/parquet_objects_extractor.rs @@ -0,0 +1,85 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + utils::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, +}; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::{ + common::models::object_models::raw_v2_objects::{ + CurrentObjectConvertible, ObjectConvertible, + }, + parquet::models::object_models::v2_objects::{CurrentObject, Object}, + }, + processors::objects_processor::process_objects, + utils::table_flags::TableFlags, +}; +use std::collections::HashMap; + +/// Extracts parquet data from transactions, allowing optional selection of specific tables. +pub struct ParquetObjectsExtractor +where + Self: Processable + Send + Sized + 'static, +{ + pub opt_in_tables: TableFlags, +} + +type ParquetTypeMap = HashMap; + +#[async_trait] +impl Processable for ParquetObjectsExtractor { + type Input = Vec; + type Output = ParquetTypeMap; + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> anyhow::Result>, ProcessorError> { + let (raw_all_objects, raw_all_current_objects) = + process_objects(transactions.data, &mut None).await; + let parquet_objects: Vec = + raw_all_objects.into_iter().map(Object::from_raw).collect(); + + let parquet_current_objects: Vec = raw_all_current_objects + .into_iter() + .map(CurrentObject::from_raw) + .collect(); + + let mut map: HashMap = HashMap::new(); + + let data_types = [ + ( + TableFlags::OBJECTS, + ParquetTypeEnum::Objects, + ParquetTypeStructs::Object(parquet_objects), + ), + ( + TableFlags::CURRENT_OBJECTS, + ParquetTypeEnum::CurrentObjects, + ParquetTypeStructs::CurrentObject(parquet_current_objects), + ), + ]; + + // Populate the map based on opt-in tables + add_to_map_if_opted_in_for_backfill(self.opt_in_tables, &mut map, data_types.to_vec()); + + Ok(Some(TransactionContext { + data: map, + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for ParquetObjectsExtractor {} + +impl NamedStep for ParquetObjectsExtractor { + fn name(&self) -> String { + "ParquetObjectsExtractor".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs index 59a0125e2..cf59f0f59 100644 --- a/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_token_v2_processor/parquet_token_v2_extractor.rs @@ -75,7 +75,7 @@ impl Processable for ParquetTokenV2Extractor { raw_current_token_v2_metadata, raw_current_token_royalties_v1, raw_current_token_claims, - ) = parse_v2_token(&transactions.data, &table_handle_to_owner, &mut None, 0, 0).await; + ) = parse_v2_token(&transactions.data, &table_handle_to_owner, &mut None).await; let parquet_current_token_claims: Vec = raw_current_token_claims .into_iter() diff --git a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs index 7b0ed613a..c67018ae6 100644 --- a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs @@ -31,6 +31,7 @@ use processor::{ }, }, processors::token_v2_processor::parse_v2_token, + utils::database::DbContext, }; /// Extracts fungible asset events, metadata, balances, and v1 supply from transactions @@ -107,6 +108,11 @@ impl Processable for TokenV2Extractor { // an earlier transaction has metadata (in resources) that's missing from a later transaction. let table_handle_to_owner: ahash::AHashMap = TableMetadataForToken::get_table_handle_to_owner_from_transactions(&transactions.data); + let db_connection = DbContext { + conn, + query_retries: self.query_retries, + query_retry_delay_ms: self.query_retry_delay_ms, + }; let ( collections_v2, @@ -124,9 +130,7 @@ impl Processable for TokenV2Extractor { ) = parse_v2_token( &transactions.data, &table_handle_to_owner, - &mut Some(conn), - self.query_retries, - self.query_retry_delay_ms, + &mut Some(db_connection), ) .await;