From 56eecdf4870c9b8926aae134bf7935f3bb0e88bf Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Thu, 13 Jul 2023 21:33:28 -0700 Subject: [PATCH] fix object deletion --- crates/indexer/src/models/v2_objects.rs | 101 +++++++++++------- .../src/processors/default_processor.rs | 51 ++++++++- 2 files changed, 106 insertions(+), 46 deletions(-) diff --git a/crates/indexer/src/models/v2_objects.rs b/crates/indexer/src/models/v2_objects.rs index a9be8a960d85bf..8ac1ed0f4a2114 100644 --- a/crates/indexer/src/models/v2_objects.rs +++ b/crates/indexer/src/models/v2_objects.rs @@ -5,13 +5,19 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::token_models::v2_token_utils::ObjectWithMetadata; +use super::token_models::{ + collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, + v2_token_utils::ObjectWithMetadata, +}; use crate::{ + database::PgPoolConnection, models::move_resources::MoveResource, schema::{current_objects, objects}, }; -use aptos_api_types::{DeleteResource, Transaction, WriteResource, WriteSetChange}; +use anyhow::Context; +use aptos_api_types::{DeleteResource, WriteResource}; use bigdecimal::BigDecimal; +use diesel::{prelude::*, sql_query, sql_types::Text}; use field_count::FieldCount; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -19,7 +25,7 @@ use std::collections::HashMap; // PK of current_objects, i.e. object_address pub type CurrentObjectPK = String; -#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, write_set_change_index))] #[diesel(table_name = objects)] pub struct Object { @@ -33,12 +39,12 @@ pub struct Object { pub is_deleted: bool, } -#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(object_address))] #[diesel(table_name = current_objects)] pub struct CurrentObject { pub object_address: String, - pub owner_address: Option, + pub owner_address: String, pub state_key_hash: String, pub allow_ungated_transfer: Option, pub last_guid_creation_num: Option, @@ -46,39 +52,14 @@ pub struct CurrentObject { pub is_deleted: bool, } -impl Object { - /// Only parsing 0x1 ObjectCore from transactions - pub fn from_transaction( - transaction: &Transaction, - ) -> (Vec, HashMap) { - if let Transaction::UserTransaction(user_txn) = transaction { - let mut objects = vec![]; - let mut current_objects: HashMap = HashMap::new(); - let txn_version = user_txn.info.version.0 as i64; - - for (index, wsc) in user_txn.info.changes.iter().enumerate() { - let index = index as i64; - let maybe_object_combo = match wsc { - WriteSetChange::DeleteResource(inner) => { - Self::from_delete_resource(inner, txn_version, index).unwrap() - }, - WriteSetChange::WriteResource(inner) => { - Self::from_write_resource(inner, txn_version, index).unwrap() - }, - _ => None, - }; - if let Some((object, current_object)) = maybe_object_combo { - objects.push(object); - current_objects.insert(current_object.object_address.clone(), current_object); - } - } - (objects, current_objects) - } else { - Default::default() - } - } +#[derive(Debug, QueryableByName)] +pub struct ObjectOwner { + #[diesel(sql_type = Text)] + pub owner_address: String, +} - fn from_write_resource( +impl Object { + pub fn from_write_resource( write_resource: &WriteResource, txn_version: i64, write_set_change_index: i64, @@ -104,7 +85,7 @@ impl Object { }, CurrentObject { object_address: resource.address, - owner_address: Some(object_core.get_owner_address()), + owner_address: object_core.get_owner_address(), state_key_hash: resource.state_key_hash, allow_ungated_transfer: Some(object_core.allow_ungated_transfer), last_guid_creation_num: Some(object_core.guid_creation_num.clone()), @@ -120,10 +101,12 @@ impl Object { /// This should never really happen since it's very difficult to delete the entire resource group /// currently. We actually need a better way of detecting whether an object is deleted since there /// is likely no delete resource write set change. - fn from_delete_resource( + pub fn from_delete_resource( delete_resource: &DeleteResource, txn_version: i64, write_set_change_index: i64, + object_mapping: &HashMap, + conn: &mut PgPoolConnection, ) -> anyhow::Result> { if delete_resource.resource.to_string() == "0x1::object::ObjectGroup" { let resource = MoveResource::from_delete_resource( @@ -132,12 +115,17 @@ impl Object { txn_version, 0, // Placeholder, this isn't used anyway ); + let owner_address = if let Some(object) = object_mapping.get(&resource.address) { + object.owner_address.clone() + } else { + Self::get_object_owner(conn, &resource.address)? + }; Ok(Some(( Self { transaction_version: txn_version, write_set_change_index, object_address: resource.address.clone(), - owner_address: None, + owner_address: Some(owner_address.clone()), state_key_hash: resource.state_key_hash.clone(), guid_creation_num: None, allow_ungated_transfer: None, @@ -145,7 +133,7 @@ impl Object { }, CurrentObject { object_address: resource.address, - owner_address: None, + owner_address, state_key_hash: resource.state_key_hash, allow_ungated_transfer: None, last_guid_creation_num: None, @@ -157,4 +145,35 @@ impl Object { Ok(None) } } + + /// This is actually not great because object owner can change. The best we can do now though + fn get_object_owner( + conn: &mut PgPoolConnection, + object_address: &str, + ) -> anyhow::Result { + let mut retried = 0; + while retried < QUERY_RETRIES { + retried += 1; + match Self::get_by_address(conn, object_address) { + Ok(res) => return Ok(res), + Err(_) => { + std::thread::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)); + }, + } + } + Err(anyhow::anyhow!("Failed to get object owner")) + } + + /// TODO: Change this to a KV store + fn get_by_address(conn: &mut PgPoolConnection, object_address: &str) -> anyhow::Result { + let mut res: Vec> = + sql_query("SELECT owner_address FROM current_objects WHERE object_address = $1") + .bind::(object_address) + .get_results(conn)?; + Ok(res + .pop() + .context("collection result empty")? + .context("collection result null")? + .owner_address) + } } diff --git a/crates/indexer/src/processors/default_processor.rs b/crates/indexer/src/processors/default_processor.rs index dd9294e4ad6990..2a65adb6448214 100644 --- a/crates/indexer/src/processors/default_processor.rs +++ b/crates/indexer/src/processors/default_processor.rs @@ -23,7 +23,7 @@ use crate::{ }, schema, }; -use aptos_api_types::Transaction; +use aptos_api_types::{Transaction, WriteSetChange}; use async_trait::async_trait; use diesel::{pg::upsert::excluded, result::Error, ExpressionMethods, PgConnection}; use field_count::FieldCount; @@ -481,6 +481,8 @@ impl TransactionProcessor for DefaultTransactionProcessor { start_version: u64, end_version: u64, ) -> Result { + let mut conn = self.get_conn(); + let (txns, txn_details, events, write_set_changes, wsc_details) = TransactionModel::from_transactions(&transactions); @@ -527,9 +529,49 @@ impl TransactionProcessor for DefaultTransactionProcessor { let mut all_objects = vec![]; let mut all_current_objects = HashMap::new(); for txn in &transactions { - let (mut objects, current_objects) = Object::from_transaction(txn); - all_objects.append(&mut objects); - all_current_objects.extend(current_objects); + let (changes, txn_version) = match txn { + Transaction::UserTransaction(user_txn) => ( + user_txn.info.changes.clone(), + user_txn.info.version.0 as i64, + ), + Transaction::BlockMetadataTransaction(bmt_txn) => { + (bmt_txn.info.changes.clone(), bmt_txn.info.version.0 as i64) + }, + _ => continue, + }; + + for (index, wsc) in changes.iter().enumerate() { + let index = index as i64; + match wsc { + WriteSetChange::WriteResource(inner) => { + if let Some((object, current_object)) = + &Object::from_write_resource(inner, txn_version, index).unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + WriteSetChange::DeleteResource(inner) => { + // Passing all_current_objects into the function so that we can get the owner of the deleted + // resource if it was handled in the same batch + if let Some((object, current_object)) = Object::from_delete_resource( + inner, + txn_version, + index, + &all_current_objects, + &mut conn, + ) + .unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + _ => {}, + } + } } // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes let mut current_table_items = current_table_items @@ -546,7 +588,6 @@ impl TransactionProcessor for DefaultTransactionProcessor { table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); - let mut conn = self.get_conn(); let tx_result = insert_to_db( &mut conn, self.name(),