From a9060160e367277e8dbf457cfdc8ad181792519a Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Wed, 11 Dec 2024 10:35:06 -0800 Subject: [PATCH] [parquet-sdk]migrate objects [pt1] --- .../src/models/account_transaction_models.rs | 9 +- .../raw_account_transactions.rs | 9 +- .../raw_v2_fungible_asset_activities.rs | 6 +- .../raw_v2_fungible_asset_balances.rs | 2 +- .../raw_v2_fungible_metadata.rs | 6 +- rust/processor/src/db/common/models/mod.rs | 1 + .../src/db/common/models/object_models/mod.rs | 5 + .../models/object_models/raw_v2_objects.rs | 238 ++++++++++++++++++ .../models/object_models/v2_object_utils.rs | 0 .../raw_v2_token_activities.rs | 12 +- .../token_v2_models/raw_v2_token_datas.rs | 6 +- .../token_v2_models/raw_v2_token_metadata.rs | 12 +- .../raw_v2_token_ownerships.rs | 10 +- .../models/token_v2_models/v2_token_utils.rs | 6 +- .../db/parquet/models/object_models/mod.rs | 4 + .../models/object_models/v2_objects.rs | 53 ++++ .../db/postgres/models/object_models/mod.rs | 1 - .../models/object_models/v2_objects.rs | 212 +++------------- .../src/db/postgres/models/resources.rs | 10 +- .../models/token_v2_models/v2_collections.rs | 6 +- .../processors/fungible_asset_processor.rs | 32 +-- .../src/processors/nft_metadata_processor.rs | 8 +- .../src/processors/objects_processor.rs | 29 ++- ...uet_fungible_asset_activities_processor.rs | 18 +- .../parquet_fungible_asset_processor.rs | 15 +- .../parquet_token_v2_processor.rs | 20 +- .../src/processors/token_v2_processor.rs | 45 ++-- .../objects_processor/objects_extractor.rs | 29 ++- 28 files changed, 505 insertions(+), 299 deletions(-) create mode 100644 rust/processor/src/db/common/models/object_models/mod.rs create mode 100644 rust/processor/src/db/common/models/object_models/raw_v2_objects.rs rename rust/processor/src/db/{postgres => common}/models/object_models/v2_object_utils.rs (100%) create mode 100644 rust/processor/src/db/parquet/models/object_models/mod.rs create mode 100644 rust/processor/src/db/parquet/models/object_models/v2_objects.rs diff --git a/rust/integration-tests/src/models/account_transaction_models.rs b/rust/integration-tests/src/models/account_transaction_models.rs index 2f80795ca..1c2538eb3 100644 --- a/rust/integration-tests/src/models/account_transaction_models.rs +++ b/rust/integration-tests/src/models/account_transaction_models.rs @@ -10,9 +10,12 @@ use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Chan use diesel::{Identifiable, Insertable, Queryable}; use field_count::FieldCount; use processor::{ - db::postgres::models::{ - object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource, - user_transactions_models::user_transactions::UserTransaction, + db::{ + common::models::object_models::v2_object_utils::ObjectWithMetadata, + postgres::models::{ + resources::FromWriteResource, + user_transactions_models::user_transactions::UserTransaction, + }, }, schema::account_transactions, utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address}, diff --git a/rust/processor/src/db/common/models/account_transaction_models/raw_account_transactions.rs b/rust/processor/src/db/common/models/account_transaction_models/raw_account_transactions.rs index fb43b46f4..eff571310 100644 --- a/rust/processor/src/db/common/models/account_transaction_models/raw_account_transactions.rs +++ b/rust/processor/src/db/common/models/account_transaction_models/raw_account_transactions.rs @@ -6,9 +6,12 @@ #![allow(clippy::unused_unit)] use crate::{ - db::postgres::models::{ - object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource, - user_transactions_models::user_transactions::UserTransaction, + db::{ + common::models::object_models::v2_object_utils::ObjectWithMetadata, + postgres::models::{ + resources::FromWriteResource, + user_transactions_models::user_transactions::UserTransaction, + }, }, utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address}, }; diff --git a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_activities.rs b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_activities.rs index 75b2d5088..2c3306a64 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_activities.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_activities.rs @@ -7,14 +7,16 @@ use crate::{ db::{ - common::models::token_v2_models::v2_token_utils::TokenStandard, + common::models::{ + object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_v2_models::v2_token_utils::TokenStandard, + }, postgres::models::{ coin_models::{ coin_activities::CoinActivity, coin_utils::{CoinEvent, CoinInfoType, EventGuidResource}, }, fungible_asset_models::v2_fungible_asset_utils::{FeeStatement, FungibleAssetEvent}, - object_models::v2_object_utils::ObjectAggregatedDataMapping, }, }, utils::util::standardize_address, diff --git a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_balances.rs b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_balances.rs index 12337b7d4..90f1b8576 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_balances.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_balances.rs @@ -9,12 +9,12 @@ use crate::{ db::{ common::models::{ fungible_asset_models::raw_v2_fungible_asset_activities::EventToCoinType, + object_models::v2_object_utils::ObjectAggregatedDataMapping, token_v2_models::v2_token_utils::{TokenStandard, V2_STANDARD}, }, postgres::models::{ coin_models::coin_utils::{CoinInfoType, CoinResource}, fungible_asset_models::v2_fungible_asset_utils::FungibleAssetStore, - object_models::v2_object_utils::ObjectAggregatedDataMapping, resources::FromWriteResource, }, }, diff --git a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_metadata.rs b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_metadata.rs index f2dc837f2..728ad50cd 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_metadata.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_metadata.rs @@ -7,11 +7,13 @@ use crate::{ db::{ - common::models::token_v2_models::v2_token_utils::TokenStandard, + common::models::{ + object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_v2_models::v2_token_utils::TokenStandard, + }, postgres::models::{ coin_models::coin_utils::{CoinInfoType, CoinResource}, fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, - object_models::v2_object_utils::ObjectAggregatedDataMapping, resources::FromWriteResource, }, }, diff --git a/rust/processor/src/db/common/models/mod.rs b/rust/processor/src/db/common/models/mod.rs index 6e0395836..3e0c98d42 100644 --- a/rust/processor/src/db/common/models/mod.rs +++ b/rust/processor/src/db/common/models/mod.rs @@ -3,4 +3,5 @@ pub mod ans_models; pub mod default_models; pub mod event_models; pub mod fungible_asset_models; +pub mod object_models; pub mod token_v2_models; diff --git a/rust/processor/src/db/common/models/object_models/mod.rs b/rust/processor/src/db/common/models/object_models/mod.rs new file mode 100644 index 000000000..b1fc4057e --- /dev/null +++ b/rust/processor/src/db/common/models/object_models/mod.rs @@ -0,0 +1,5 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod raw_v2_objects; +pub mod v2_object_utils; diff --git a/rust/processor/src/db/common/models/object_models/raw_v2_objects.rs b/rust/processor/src/db/common/models/object_models/raw_v2_objects.rs new file mode 100644 index 000000000..a80a504c7 --- /dev/null +++ b/rust/processor/src/db/common/models/object_models/raw_v2_objects.rs @@ -0,0 +1,238 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping}; +use crate::{ + db::postgres::models::default_models::move_resources::MoveResource, + schema::current_objects, + utils::{database::DbPoolConnection, util::standardize_address}, +}; +use ahash::AHashMap; +use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; +use bigdecimal::BigDecimal; +use diesel::prelude::*; +use diesel_async::RunQueryDsl; +use field_count::FieldCount; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)] +pub struct RawObject { + pub transaction_version: i64, + pub write_set_change_index: i64, + pub object_address: String, + pub owner_address: String, + pub state_key_hash: String, + pub guid_creation_num: BigDecimal, + pub allow_ungated_transfer: bool, + pub is_deleted: bool, + pub untransferrable: bool, +} + +pub trait ObjectConvertible { + fn from_raw(raw_item: RawObject) -> Self; +} + +#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)] +pub struct RawCurrentObject { + pub object_address: String, + pub owner_address: String, + pub state_key_hash: String, + pub allow_ungated_transfer: bool, + pub last_guid_creation_num: BigDecimal, + pub last_transaction_version: i64, + pub is_deleted: bool, + pub untransferrable: bool, +} + +pub trait CurrentObjectConvertible { + fn from_raw(raw_item: RawCurrentObject) -> Self; +} + +#[derive(Debug, Deserialize, Identifiable, Queryable, Serialize)] +#[diesel(primary_key(object_address))] +#[diesel(table_name = current_objects)] +pub struct CurrentObjectQuery { + pub object_address: String, + pub owner_address: String, + pub state_key_hash: String, + pub allow_ungated_transfer: bool, + pub last_guid_creation_num: BigDecimal, + pub last_transaction_version: i64, + pub is_deleted: bool, + pub inserted_at: chrono::NaiveDateTime, + pub untransferrable: bool, +} + +impl RawObject { + pub fn from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + write_set_change_index: i64, + object_metadata_mapping: &ObjectAggregatedDataMapping, + ) -> anyhow::Result> { + let address = standardize_address(&write_resource.address.to_string()); + println!("address: {:?}", address); + if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) { + println!( + "object_aggregated_metadata: {:?}", + object_aggregated_metadata + ); + // do something + let object_with_metadata = object_aggregated_metadata.object.clone(); + let object_core = object_with_metadata.object_core; + + let untransferrable = if object_aggregated_metadata.untransferable.as_ref().is_some() { + true + } else { + !object_core.allow_ungated_transfer + }; + Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + object_address: address.clone(), + owner_address: object_core.get_owner_address(), + state_key_hash: object_with_metadata.state_key_hash.clone(), + guid_creation_num: object_core.guid_creation_num.clone(), + allow_ungated_transfer: object_core.allow_ungated_transfer, + is_deleted: false, + untransferrable, + }, + RawCurrentObject { + object_address: address, + owner_address: object_core.get_owner_address(), + state_key_hash: object_with_metadata.state_key_hash, + allow_ungated_transfer: object_core.allow_ungated_transfer, + last_guid_creation_num: object_core.guid_creation_num.clone(), + last_transaction_version: txn_version, + is_deleted: false, + untransferrable, + }, + ))) + } else { + Ok(None) + } + } + + /// This handles the case where the entire object is deleted + /// TODO: We need to detect if an object is only partially deleted + /// using KV store + pub async fn from_delete_resource( + delete_resource: &DeleteResource, + txn_version: i64, + write_set_change_index: i64, + object_mapping: &AHashMap, + conn: &mut DbPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, + ) -> anyhow::Result> { + if delete_resource.type_str == "0x1::object::ObjectGroup" { + let resource = MoveResource::from_delete_resource( + delete_resource, + 0, // Placeholder, this isn't used anyway + txn_version, + 0, // Placeholder, this isn't used anyway + ); + let previous_object = if let Some(object) = object_mapping.get(&resource.address) { + object.clone() + } else { + match Self::get_current_object( + conn, + &resource.address, + query_retries, + query_retry_delay_ms, + ) + .await + { + Ok(object) => object, + Err(_) => { + tracing::error!( + transaction_version = txn_version, + lookup_key = &resource.address, + "Missing current_object for object_address: {}. You probably should backfill db.", + resource.address, + ); + return Ok(None); + }, + } + }; + Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + object_address: resource.address.clone(), + owner_address: previous_object.owner_address.clone(), + state_key_hash: resource.state_key_hash.clone(), + guid_creation_num: previous_object.last_guid_creation_num.clone(), + allow_ungated_transfer: previous_object.allow_ungated_transfer, + is_deleted: true, + untransferrable: previous_object.untransferrable, + }, + RawCurrentObject { + object_address: resource.address, + owner_address: previous_object.owner_address.clone(), + state_key_hash: resource.state_key_hash, + last_guid_creation_num: previous_object.last_guid_creation_num.clone(), + allow_ungated_transfer: previous_object.allow_ungated_transfer, + last_transaction_version: txn_version, + is_deleted: true, + untransferrable: previous_object.untransferrable, + }, + ))) + } else { + Ok(None) + } + } + + /// This is actually not great because object owner can change. The best we can do now though. + /// This will loop forever until we get the object from the db + pub async fn get_current_object( + conn: &mut DbPoolConnection<'_>, + object_address: &str, + query_retries: u32, + query_retry_delay_ms: u64, + ) -> anyhow::Result { + let mut tried = 0; + while tried < query_retries { + tried += 1; + match CurrentObjectQuery::get_by_address(object_address, conn).await { + Ok(res) => { + return Ok(RawCurrentObject { + object_address: res.object_address, + owner_address: res.owner_address, + state_key_hash: res.state_key_hash, + allow_ungated_transfer: res.allow_ungated_transfer, + last_guid_creation_num: res.last_guid_creation_num, + last_transaction_version: res.last_transaction_version, + is_deleted: res.is_deleted, + untransferrable: res.untransferrable, + }); + }, + Err(_) => { + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } + }, + } + } + Err(anyhow::anyhow!("Failed to get object owner")) + } +} + +impl CurrentObjectQuery { + /// TODO: Change this to a KV store + pub async fn get_by_address( + object_address: &str, + conn: &mut DbPoolConnection<'_>, + ) -> diesel::QueryResult { + current_objects::table + .filter(current_objects::object_address.eq(object_address)) + .first::(conn) + .await + } +} diff --git a/rust/processor/src/db/postgres/models/object_models/v2_object_utils.rs b/rust/processor/src/db/common/models/object_models/v2_object_utils.rs similarity index 100% rename from rust/processor/src/db/postgres/models/object_models/v2_object_utils.rs rename to rust/processor/src/db/common/models/object_models/v2_object_utils.rs diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs index e87efdfeb..264074e17 100644 --- a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs @@ -7,14 +7,14 @@ use crate::{ db::{ - common::models::token_v2_models::{ - raw_token_claims::TokenV1Claimed, - v2_token_utils::{TokenStandard, V2TokenEvent}, - }, - postgres::models::{ + common::models::{ object_models::v2_object_utils::ObjectAggregatedDataMapping, - token_models::token_utils::{TokenDataIdType, TokenEvent}, + token_v2_models::{ + raw_token_claims::TokenV1Claimed, + v2_token_utils::{TokenStandard, V2TokenEvent}, + }, }, + postgres::models::token_models::token_utils::{TokenDataIdType, TokenEvent}, }, utils::util::standardize_address, }; diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs index 4b2ca9bdb..1ed52b35c 100644 --- a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs @@ -7,9 +7,11 @@ use crate::{ db::{ - common::models::token_v2_models::v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned}, - postgres::models::{ + common::models::{ object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_v2_models::v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned}, + }, + postgres::models::{ resources::FromWriteResource, token_models::token_utils::TokenWriteSet, }, }, diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_metadata.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_metadata.rs index 76cb89e23..0b9cd405b 100644 --- a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_metadata.rs +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_metadata.rs @@ -6,11 +6,13 @@ #![allow(clippy::unused_unit)] use crate::{ - db::postgres::models::{ - default_models::move_resources::MoveResource, - object_models::v2_object_utils::ObjectAggregatedDataMapping, - resources::{COIN_ADDR, TOKEN_ADDR, TOKEN_V2_ADDR}, - token_models::token_utils::NAME_LENGTH, + db::{ + common::models::object_models::v2_object_utils::ObjectAggregatedDataMapping, + postgres::models::{ + default_models::move_resources::MoveResource, + resources::{COIN_ADDR, TOKEN_ADDR, TOKEN_V2_ADDR}, + token_models::token_utils::NAME_LENGTH, + }, }, utils::util::{standardize_address, truncate_str}, }; diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs index 545a77ad7..f266fc457 100644 --- a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs @@ -7,12 +7,14 @@ use crate::{ db::{ - common::models::token_v2_models::{ - raw_v2_token_datas::RawTokenDataV2, - v2_token_utils::{TokenStandard, TokenV2Burned, DEFAULT_OWNER_ADDRESS}, + common::models::{ + object_models::v2_object_utils::{ObjectAggregatedDataMapping, ObjectWithMetadata}, + token_v2_models::{ + raw_v2_token_datas::RawTokenDataV2, + v2_token_utils::{TokenStandard, TokenV2Burned, DEFAULT_OWNER_ADDRESS}, + }, }, postgres::models::{ - object_models::v2_object_utils::{ObjectAggregatedDataMapping, ObjectWithMetadata}, resources::FromWriteResource, token_models::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}, }, diff --git a/rust/processor/src/db/common/models/token_v2_models/v2_token_utils.rs b/rust/processor/src/db/common/models/token_v2_models/v2_token_utils.rs index b6bab62bf..b0df33627 100644 --- a/rust/processor/src/db/common/models/token_v2_models/v2_token_utils.rs +++ b/rust/processor/src/db/common/models/token_v2_models/v2_token_utils.rs @@ -5,9 +5,9 @@ #![allow(clippy::extra_unused_lifetimes)] use crate::{ - db::postgres::models::{ - object_models::v2_object_utils::CurrentObjectPK, - token_models::token_utils::{NAME_LENGTH, URI_LENGTH}, + db::{ + common::models::object_models::v2_object_utils::CurrentObjectPK, + postgres::models::token_models::token_utils::{NAME_LENGTH, URI_LENGTH}, }, utils::util::{ deserialize_from_string, deserialize_token_object_property_map_from_bcs_hexstring, diff --git a/rust/processor/src/db/parquet/models/object_models/mod.rs b/rust/processor/src/db/parquet/models/object_models/mod.rs new file mode 100644 index 000000000..4283618f4 --- /dev/null +++ b/rust/processor/src/db/parquet/models/object_models/mod.rs @@ -0,0 +1,4 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod v2_objects; diff --git a/rust/processor/src/db/parquet/models/object_models/v2_objects.rs b/rust/processor/src/db/parquet/models/object_models/v2_objects.rs new file mode 100644 index 000000000..6587eac13 --- /dev/null +++ b/rust/processor/src/db/parquet/models/object_models/v2_objects.rs @@ -0,0 +1,53 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping}; +use crate::{ + db::postgres::models::default_models::move_resources::MoveResource, + schema::{current_objects, objects}, + utils::{database::DbPoolConnection, util::standardize_address}, +}; +use ahash::AHashMap; +use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; +use bigdecimal::BigDecimal; +use diesel::prelude::*; +use diesel_async::RunQueryDsl; +use field_count::FieldCount; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +pub struct RawObject { + pub transaction_version: i64, + pub write_set_change_index: i64, + pub object_address: String, + pub owner_address: String, + pub state_key_hash: String, + pub guid_creation_num: BigDecimal, + pub allow_ungated_transfer: bool, + pub is_deleted: bool, + pub untransferrable: bool, +} + +pub trait ObjectConvertible { + fn from_raw(raw_item: RawObject) -> Self; +} + +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +pub struct RawCurrentObject { + pub object_address: String, + pub owner_address: String, + pub state_key_hash: String, + pub allow_ungated_transfer: bool, + pub last_guid_creation_num: BigDecimal, + pub last_transaction_version: i64, + pub is_deleted: bool, + pub untransferrable: bool, +} + +pub trait CurrentObjectConvertible { + fn from_raw(raw_item: RawCurrentObject) -> Self; +} diff --git a/rust/processor/src/db/postgres/models/object_models/mod.rs b/rust/processor/src/db/postgres/models/object_models/mod.rs index 63812d5c0..4283618f4 100644 --- a/rust/processor/src/db/postgres/models/object_models/mod.rs +++ b/rust/processor/src/db/postgres/models/object_models/mod.rs @@ -1,5 +1,4 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -pub mod v2_object_utils; pub mod v2_objects; diff --git a/rust/processor/src/db/postgres/models/object_models/v2_objects.rs b/rust/processor/src/db/postgres/models/object_models/v2_objects.rs index e90ea1d90..1a0f0d654 100644 --- a/rust/processor/src/db/postgres/models/object_models/v2_objects.rs +++ b/rust/processor/src/db/postgres/models/object_models/v2_objects.rs @@ -5,17 +5,14 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping}; use crate::{ - db::postgres::models::default_models::move_resources::MoveResource, + db::common::models::object_models::raw_v2_objects::{ + CurrentObjectConvertible, ObjectConvertible, RawCurrentObject, RawObject, + }, schema::{current_objects, objects}, - utils::{database::DbPoolConnection, util::standardize_address}, }; -use ahash::AHashMap; -use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; use bigdecimal::BigDecimal; use diesel::prelude::*; -use diesel_async::RunQueryDsl; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -34,24 +31,26 @@ pub struct Object { pub untransferrable: bool, } -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] -#[diesel(primary_key(object_address))] -#[diesel(table_name = current_objects)] -pub struct CurrentObject { - pub object_address: String, - pub owner_address: String, - pub state_key_hash: String, - pub allow_ungated_transfer: bool, - pub last_guid_creation_num: BigDecimal, - pub last_transaction_version: i64, - pub is_deleted: bool, - pub untransferrable: bool, +impl ObjectConvertible for Object { + fn from_raw(raw_item: RawObject) -> Self { + Self { + transaction_version: raw_item.transaction_version, + write_set_change_index: raw_item.write_set_change_index, + object_address: raw_item.object_address, + owner_address: raw_item.owner_address, + state_key_hash: raw_item.state_key_hash, + guid_creation_num: raw_item.guid_creation_num, + allow_ungated_transfer: raw_item.allow_ungated_transfer, + is_deleted: raw_item.is_deleted, + untransferrable: raw_item.untransferrable, + } + } } -#[derive(Debug, Deserialize, Identifiable, Queryable, Serialize)] +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(object_address))] #[diesel(table_name = current_objects)] -pub struct CurrentObjectQuery { +pub struct CurrentObject { pub object_address: String, pub owner_address: String, pub state_key_hash: String, @@ -59,171 +58,20 @@ pub struct CurrentObjectQuery { pub last_guid_creation_num: BigDecimal, pub last_transaction_version: i64, pub is_deleted: bool, - pub inserted_at: chrono::NaiveDateTime, pub untransferrable: bool, } -impl Object { - pub fn from_write_resource( - write_resource: &WriteResource, - txn_version: i64, - write_set_change_index: i64, - object_metadata_mapping: &ObjectAggregatedDataMapping, - ) -> anyhow::Result> { - let address = standardize_address(&write_resource.address.to_string()); - if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) { - // do something - let object_with_metadata = object_aggregated_metadata.object.clone(); - let object_core = object_with_metadata.object_core; - - let untransferrable = if object_aggregated_metadata.untransferable.as_ref().is_some() { - true - } else { - !object_core.allow_ungated_transfer - }; - Ok(Some(( - Self { - transaction_version: txn_version, - write_set_change_index, - object_address: address.clone(), - owner_address: object_core.get_owner_address(), - state_key_hash: object_with_metadata.state_key_hash.clone(), - guid_creation_num: object_core.guid_creation_num.clone(), - allow_ungated_transfer: object_core.allow_ungated_transfer, - is_deleted: false, - untransferrable, - }, - CurrentObject { - object_address: address, - owner_address: object_core.get_owner_address(), - state_key_hash: object_with_metadata.state_key_hash, - allow_ungated_transfer: object_core.allow_ungated_transfer, - last_guid_creation_num: object_core.guid_creation_num.clone(), - last_transaction_version: txn_version, - is_deleted: false, - untransferrable, - }, - ))) - } else { - Ok(None) - } - } - - /// This handles the case where the entire object is deleted - /// TODO: We need to detect if an object is only partially deleted - /// using KV store - pub async fn from_delete_resource( - delete_resource: &DeleteResource, - txn_version: i64, - write_set_change_index: i64, - object_mapping: &AHashMap, - conn: &mut DbPoolConnection<'_>, - query_retries: u32, - query_retry_delay_ms: u64, - ) -> anyhow::Result> { - if delete_resource.type_str == "0x1::object::ObjectGroup" { - let resource = MoveResource::from_delete_resource( - delete_resource, - 0, // Placeholder, this isn't used anyway - txn_version, - 0, // Placeholder, this isn't used anyway - ); - let previous_object = if let Some(object) = object_mapping.get(&resource.address) { - object.clone() - } else { - match Self::get_current_object( - conn, - &resource.address, - query_retries, - query_retry_delay_ms, - ) - .await - { - Ok(object) => object, - Err(_) => { - tracing::error!( - transaction_version = txn_version, - lookup_key = &resource.address, - "Missing current_object for object_address: {}. You probably should backfill db.", - resource.address, - ); - return Ok(None); - }, - } - }; - Ok(Some(( - Self { - transaction_version: txn_version, - write_set_change_index, - object_address: resource.address.clone(), - owner_address: previous_object.owner_address.clone(), - state_key_hash: resource.state_key_hash.clone(), - guid_creation_num: previous_object.last_guid_creation_num.clone(), - allow_ungated_transfer: previous_object.allow_ungated_transfer, - is_deleted: true, - untransferrable: previous_object.untransferrable, - }, - CurrentObject { - object_address: resource.address, - owner_address: previous_object.owner_address.clone(), - state_key_hash: resource.state_key_hash, - last_guid_creation_num: previous_object.last_guid_creation_num.clone(), - allow_ungated_transfer: previous_object.allow_ungated_transfer, - last_transaction_version: txn_version, - is_deleted: true, - untransferrable: previous_object.untransferrable, - }, - ))) - } else { - Ok(None) +impl CurrentObjectConvertible for CurrentObject { + fn from_raw(raw_item: RawCurrentObject) -> Self { + Self { + object_address: raw_item.object_address, + owner_address: raw_item.owner_address, + state_key_hash: raw_item.state_key_hash, + allow_ungated_transfer: raw_item.allow_ungated_transfer, + last_guid_creation_num: raw_item.last_guid_creation_num, + last_transaction_version: raw_item.last_transaction_version, + is_deleted: raw_item.is_deleted, + untransferrable: raw_item.untransferrable, } } - - /// This is actually not great because object owner can change. The best we can do now though. - /// This will loop forever until we get the object from the db - pub async fn get_current_object( - conn: &mut DbPoolConnection<'_>, - object_address: &str, - query_retries: u32, - query_retry_delay_ms: u64, - ) -> anyhow::Result { - let mut tried = 0; - while tried < query_retries { - tried += 1; - match CurrentObjectQuery::get_by_address(object_address, conn).await { - Ok(res) => { - return Ok(CurrentObject { - object_address: res.object_address, - owner_address: res.owner_address, - state_key_hash: res.state_key_hash, - allow_ungated_transfer: res.allow_ungated_transfer, - last_guid_creation_num: res.last_guid_creation_num, - last_transaction_version: res.last_transaction_version, - is_deleted: res.is_deleted, - untransferrable: res.untransferrable, - }); - }, - Err(_) => { - if tried < query_retries { - tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) - .await; - } - }, - } - } - Err(anyhow::anyhow!("Failed to get object owner")) - } -} - -impl CurrentObjectQuery { - /// TODO: Change this to a KV store - pub async fn get_by_address( - object_address: &str, - conn: &mut DbPoolConnection<'_>, - ) -> diesel::QueryResult { - current_objects::table - .filter(current_objects::object_address.eq(object_address)) - .first::(conn) - .await - } } diff --git a/rust/processor/src/db/postgres/models/resources.rs b/rust/processor/src/db/postgres/models/resources.rs index cc26cea9e..0b6fed3c4 100644 --- a/rust/processor/src/db/postgres/models/resources.rs +++ b/rust/processor/src/db/postgres/models/resources.rs @@ -2,9 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::db::{ - common::models::token_v2_models::v2_token_utils::{ - AptosCollection, Collection, ConcurrentSupply, FixedSupply, PropertyMapModel, - TokenIdentifiers, TokenV2, UnlimitedSupply, + common::models::{ + object_models::v2_object_utils::{ObjectCore, Untransferable}, + token_v2_models::v2_token_utils::{ + AptosCollection, Collection, ConcurrentSupply, FixedSupply, PropertyMapModel, + TokenIdentifiers, TokenV2, UnlimitedSupply, + }, }, postgres::models::{ default_models::move_resources::MoveResource, @@ -12,7 +15,6 @@ use crate::db::{ ConcurrentFungibleAssetBalance, ConcurrentFungibleAssetSupply, FungibleAssetMetadata, FungibleAssetStore, FungibleAssetSupply, }, - object_models::v2_object_utils::{ObjectCore, Untransferable}, }, }; use anyhow::Result; diff --git a/rust/processor/src/db/postgres/models/token_v2_models/v2_collections.rs b/rust/processor/src/db/postgres/models/token_v2_models/v2_collections.rs index 6b0a55d71..e17686efd 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/v2_collections.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/v2_collections.rs @@ -7,9 +7,11 @@ use crate::{ db::{ - common::models::token_v2_models::v2_token_utils::{Collection, TokenStandard}, - postgres::models::{ + common::models::{ object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_v2_models::v2_token_utils::{Collection, TokenStandard}, + }, + postgres::models::{ resources::FromWriteResource, token_models::{ collection_datas::CollectionData, diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 40e5f7e7a..47e6ffc94 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -4,19 +4,24 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::{ - common::models::fungible_asset_models::{ - raw_v2_fungible_asset_activities::{ - FungibleAssetActivityConvertible, RawFungibleAssetActivity, - }, - raw_v2_fungible_asset_balances::{ - CurrentFungibleAssetBalanceConvertible, CurrentFungibleAssetMapping, - CurrentUnifiedFungibleAssetBalanceConvertible, FungibleAssetBalanceConvertible, - RawCurrentFungibleAssetBalance, RawCurrentUnifiedFungibleAssetBalance, - RawFungibleAssetBalance, + common::models::{ + fungible_asset_models::{ + raw_v2_fungible_asset_activities::{ + FungibleAssetActivityConvertible, RawFungibleAssetActivity, + }, + raw_v2_fungible_asset_balances::{ + CurrentFungibleAssetBalanceConvertible, CurrentFungibleAssetMapping, + CurrentUnifiedFungibleAssetBalanceConvertible, FungibleAssetBalanceConvertible, + RawCurrentFungibleAssetBalance, RawCurrentUnifiedFungibleAssetBalance, + RawFungibleAssetBalance, + }, + raw_v2_fungible_metadata::{ + FungibleAssetMetadataConvertible, FungibleAssetMetadataMapping, + RawFungibleAssetMetadataModel, + }, }, - raw_v2_fungible_metadata::{ - FungibleAssetMetadataConvertible, FungibleAssetMetadataMapping, - RawFungibleAssetMetadataModel, + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, }, }, postgres::models::{ @@ -30,9 +35,6 @@ use crate::{ v2_fungible_asset_utils::FeeStatement, v2_fungible_metadata::FungibleAssetMetadataModel, }, - object_models::v2_object_utils::{ - ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, - }, resources::{FromWriteResource, V2FungibleAssetResource}, }, }, diff --git a/rust/processor/src/processors/nft_metadata_processor.rs b/rust/processor/src/processors/nft_metadata_processor.rs index a5c6c610c..7ec2bf262 100644 --- a/rust/processor/src/processors/nft_metadata_processor.rs +++ b/rust/processor/src/processors/nft_metadata_processor.rs @@ -4,13 +4,13 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::{ - common::models::token_v2_models::raw_v2_token_datas::{ - RawCurrentTokenDataV2, RawTokenDataV2, - }, - postgres::models::{ + common::models::{ object_models::v2_object_utils::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, }, + token_v2_models::raw_v2_token_datas::{RawCurrentTokenDataV2, RawTokenDataV2}, + }, + postgres::models::{ resources::FromWriteResource, token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, token_v2_models::{ diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs index da9673933..79d3e87eb 100644 --- a/rust/processor/src/processors/objects_processor.rs +++ b/rust/processor/src/processors/objects_processor.rs @@ -3,15 +3,20 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ - db::postgres::models::{ - object_models::{ + db::{ + common::models::object_models::{ + raw_v2_objects::{ + CurrentObjectConvertible, ObjectConvertible, RawCurrentObject, RawObject, + }, v2_object_utils::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, Untransferable, }, - v2_objects::{CurrentObject, Object}, }, - resources::FromWriteResource, + postgres::models::{ + object_models::v2_objects::{CurrentObject, Object}, + resources::FromWriteResource, + }, }, gap_detectors::ProcessingResult, schema, @@ -195,7 +200,6 @@ impl ProcessorTrait for ObjectsProcessor { ) }) .changes; - // First pass to get all the object cores for wsc in changes.iter() { if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { @@ -245,7 +249,8 @@ impl ProcessorTrait for ObjectsProcessor { let index: i64 = index as i64; match wsc.change.as_ref().unwrap() { Change::WriteResource(inner) => { - if let Some((object, current_object)) = &Object::from_write_resource( + println!("index: {:?}", index); + if let Some((object, current_object)) = &RawObject::from_write_resource( inner, txn_version, index, @@ -261,7 +266,7 @@ impl ProcessorTrait for ObjectsProcessor { Change::DeleteResource(inner) => { // Passing all_current_objects into the function so that we can get the owner of the deleted // resource if it was handled in the same batch - if let Some((object, current_object)) = Object::from_delete_resource( + if let Some((object, current_object)) = RawObject::from_delete_resource( inner, txn_version, index, @@ -286,13 +291,19 @@ impl ProcessorTrait for ObjectsProcessor { // Sort by PK let mut all_current_objects = all_current_objects .into_values() - .collect::>(); + .collect::>(); all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); if self.deprecated_tables.contains(TableFlags::OBJECTS) { all_objects.clear(); } + let postgres_objects: Vec = all_objects.into_iter().map(Object::from_raw).collect(); + let postgres_current_objects: Vec = all_current_objects + .into_iter() + .map(CurrentObject::from_raw) + .collect(); + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -301,7 +312,7 @@ impl ProcessorTrait for ObjectsProcessor { self.name(), start_version, end_version, - (&all_objects, &all_current_objects), + (&postgres_objects, &postgres_current_objects), &self.per_table_chunk_sizes, ) .await; diff --git a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_activities_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_activities_processor.rs index 47206e948..36b316a58 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_activities_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_activities_processor.rs @@ -8,19 +8,21 @@ use crate::{ ParquetProcessingResult, }, db::{ - common::models::fungible_asset_models::{ - raw_v2_fungible_asset_activities::{ - EventToCoinType, FungibleAssetActivityConvertible, RawFungibleAssetActivity, + common::models::{ + fungible_asset_models::{ + raw_v2_fungible_asset_activities::{ + EventToCoinType, FungibleAssetActivityConvertible, RawFungibleAssetActivity, + }, + raw_v2_fungible_asset_balances::RawFungibleAssetBalance, }, - raw_v2_fungible_asset_balances::RawFungibleAssetBalance, - }, - parquet::models::fungible_asset_models::parquet_v2_fungible_asset_activities::FungibleAssetActivity, - postgres::models::{ - fungible_asset_models::v2_fungible_asset_utils::FeeStatement, object_models::v2_object_utils::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, Untransferable, }, + }, + parquet::models::fungible_asset_models::parquet_v2_fungible_asset_activities::FungibleAssetActivity, + postgres::models::{ + fungible_asset_models::v2_fungible_asset_utils::FeeStatement, resources::{FromWriteResource, V2FungibleAssetResource}, }, }, diff --git a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs index c3683798e..33b8977b6 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs @@ -8,16 +8,17 @@ use crate::{ ParquetProcessingResult, }, db::{ - common::models::fungible_asset_models::raw_v2_fungible_asset_balances::{ - FungibleAssetBalanceConvertible, RawFungibleAssetBalance, - }, - parquet::models::fungible_asset_models::parquet_v2_fungible_asset_balances::FungibleAssetBalance, - postgres::models::{ - fungible_asset_models::parquet_coin_supply::CoinSupply, + common::models::{ + fungible_asset_models::raw_v2_fungible_asset_balances::{ + FungibleAssetBalanceConvertible, RawFungibleAssetBalance, + }, object_models::v2_object_utils::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, }, - resources::FromWriteResource, + }, + parquet::models::fungible_asset_models::parquet_v2_fungible_asset_balances::FungibleAssetBalance, + postgres::models::{ + fungible_asset_models::parquet_coin_supply::CoinSupply, resources::FromWriteResource, }, }, gap_detectors::ProcessingResult, diff --git a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs index 3a76cf3d3..6e4ece293 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs @@ -8,13 +8,18 @@ use crate::{ ParquetProcessingResult, }, db::{ - common::models::token_v2_models::{ - raw_v2_token_datas::{RawTokenDataV2, TokenDataV2Convertible}, - raw_v2_token_ownerships::{ - NFTOwnershipV2, RawTokenOwnershipV2, TokenOwnershipV2Convertible, + common::models::{ + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, }, - v2_token_utils::{ - Burn, BurnEvent, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, + token_v2_models::{ + raw_v2_token_datas::{RawTokenDataV2, TokenDataV2Convertible}, + raw_v2_token_ownerships::{ + NFTOwnershipV2, RawTokenOwnershipV2, TokenOwnershipV2Convertible, + }, + v2_token_utils::{ + Burn, BurnEvent, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, + }, }, }, parquet::models::token_v2_models::{ @@ -22,9 +27,6 @@ use crate::{ }, postgres::models::{ fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, - object_models::v2_object_utils::{ - ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, - }, resources::{FromWriteResource, V2TokenResource}, token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, }, diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index 7286c708e..be9faa00d 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -4,30 +4,37 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::{ - common::models::token_v2_models::{ - raw_token_claims::{ - CurrentTokenPendingClaimConvertible, RawCurrentTokenPendingClaim, TokenV1Claimed, - }, - raw_v1_token_royalty::{CurrentTokenRoyaltyV1Convertible, RawCurrentTokenRoyaltyV1}, - raw_v2_token_activities::{RawTokenActivityV2, TokenActivityV2Convertible}, - raw_v2_token_datas::{ - CurrentTokenDataV2Convertible, RawCurrentTokenDataV2, RawTokenDataV2, - TokenDataV2Convertible, - }, - raw_v2_token_metadata::{CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata}, - raw_v2_token_ownerships::{ - CurrentTokenOwnershipV2Convertible, CurrentTokenOwnershipV2PK, NFTOwnershipV2, - RawCurrentTokenOwnershipV2, RawTokenOwnershipV2, TokenOwnershipV2Convertible, + common::models::{ + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, }, - v2_token_utils::{ - Burn, BurnEvent, Mint, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, + token_v2_models::{ + raw_token_claims::{ + CurrentTokenPendingClaimConvertible, RawCurrentTokenPendingClaim, + TokenV1Claimed, + }, + raw_v1_token_royalty::{ + CurrentTokenRoyaltyV1Convertible, RawCurrentTokenRoyaltyV1, + }, + raw_v2_token_activities::{RawTokenActivityV2, TokenActivityV2Convertible}, + raw_v2_token_datas::{ + CurrentTokenDataV2Convertible, RawCurrentTokenDataV2, RawTokenDataV2, + TokenDataV2Convertible, + }, + raw_v2_token_metadata::{ + CurrentTokenV2MetadataConvertible, RawCurrentTokenV2Metadata, + }, + raw_v2_token_ownerships::{ + CurrentTokenOwnershipV2Convertible, CurrentTokenOwnershipV2PK, NFTOwnershipV2, + RawCurrentTokenOwnershipV2, RawTokenOwnershipV2, TokenOwnershipV2Convertible, + }, + v2_token_utils::{ + Burn, BurnEvent, Mint, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, + }, }, }, postgres::models::{ fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, - object_models::v2_object_utils::{ - ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, - }, resources::{FromWriteResource, V2TokenResource}, token_models::{ token_claims::CurrentTokenPendingClaim, diff --git a/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs b/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs index 60bdd4ca2..92ce006a9 100644 --- a/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs +++ b/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs @@ -8,14 +8,19 @@ use aptos_indexer_processor_sdk::{ }; use async_trait::async_trait; use processor::{ - db::postgres::models::{ - object_models::{ + db::{ + common::models::object_models::{ + raw_v2_objects::{ + CurrentObjectConvertible, ObjectConvertible, RawCurrentObject, RawObject, + }, v2_object_utils::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, }, - v2_objects::{CurrentObject, Object}, }, - resources::FromWriteResource, + postgres::models::{ + object_models::v2_objects::{CurrentObject, Object}, + resources::FromWriteResource, + }, }, utils::table_flags::TableFlags, }; @@ -122,7 +127,7 @@ impl Processable for ObjectsExtractor { let index: i64 = index as i64; match wsc.change.as_ref().unwrap() { Change::WriteResource(inner) => { - if let Some((object, current_object)) = &Object::from_write_resource( + if let Some((object, current_object)) = &RawObject::from_write_resource( inner, txn_version, index, @@ -138,7 +143,7 @@ impl Processable for ObjectsExtractor { Change::DeleteResource(inner) => { // Passing all_current_objects into the function so that we can get the owner of the deleted // resource if it was handled in the same batch - if let Some((object, current_object)) = Object::from_delete_resource( + if let Some((object, current_object)) = RawObject::from_delete_resource( inner, txn_version, index, @@ -163,15 +168,23 @@ impl Processable for ObjectsExtractor { // Sort by PK let mut all_current_objects = all_current_objects .into_values() - .collect::>(); + .collect::>(); all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); if self.deprecated_tables.contains(TableFlags::OBJECTS) { all_objects.clear(); } + let postgres_all_objects: Vec = + all_objects.into_iter().map(Object::from_raw).collect(); + + let postgres_all_current_objects: Vec = all_current_objects + .into_iter() + .map(CurrentObject::from_raw) + .collect(); + Ok(Some(TransactionContext { - data: (all_objects, all_current_objects), + data: (postgres_all_objects, postgres_all_current_objects), metadata: transactions.metadata, })) }