Skip to content

Commit

Permalink
fix object deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenyang007 committed Jul 14, 2023
1 parent 26344ea commit 56eecdf
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 46 deletions.
101 changes: 60 additions & 41 deletions crates/indexer/src/models/v2_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,27 @@
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use super::token_models::v2_token_utils::ObjectWithMetadata;
use super::token_models::{
collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS},
v2_token_utils::ObjectWithMetadata,
};
use crate::{
database::PgPoolConnection,
models::move_resources::MoveResource,
schema::{current_objects, objects},
};
use aptos_api_types::{DeleteResource, Transaction, WriteResource, WriteSetChange};
use anyhow::Context;
use aptos_api_types::{DeleteResource, WriteResource};
use bigdecimal::BigDecimal;
use diesel::{prelude::*, sql_query, sql_types::Text};
use field_count::FieldCount;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

// PK of current_objects, i.e. object_address
pub type CurrentObjectPK = String;

#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, write_set_change_index))]
#[diesel(table_name = objects)]
pub struct Object {
Expand All @@ -33,52 +39,27 @@ pub struct Object {
pub is_deleted: bool,
}

#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(object_address))]
#[diesel(table_name = current_objects)]
pub struct CurrentObject {
pub object_address: String,
pub owner_address: Option<String>,
pub owner_address: String,
pub state_key_hash: String,
pub allow_ungated_transfer: Option<bool>,
pub last_guid_creation_num: Option<BigDecimal>,
pub last_transaction_version: i64,
pub is_deleted: bool,
}

impl Object {
/// Only parsing 0x1 ObjectCore from transactions
pub fn from_transaction(
transaction: &Transaction,
) -> (Vec<Self>, HashMap<CurrentObjectPK, CurrentObject>) {
if let Transaction::UserTransaction(user_txn) = transaction {
let mut objects = vec![];
let mut current_objects: HashMap<String, CurrentObject> = HashMap::new();
let txn_version = user_txn.info.version.0 as i64;

for (index, wsc) in user_txn.info.changes.iter().enumerate() {
let index = index as i64;
let maybe_object_combo = match wsc {
WriteSetChange::DeleteResource(inner) => {
Self::from_delete_resource(inner, txn_version, index).unwrap()
},
WriteSetChange::WriteResource(inner) => {
Self::from_write_resource(inner, txn_version, index).unwrap()
},
_ => None,
};
if let Some((object, current_object)) = maybe_object_combo {
objects.push(object);
current_objects.insert(current_object.object_address.clone(), current_object);
}
}
(objects, current_objects)
} else {
Default::default()
}
}
#[derive(Debug, QueryableByName)]
pub struct ObjectOwner {
#[diesel(sql_type = Text)]
pub owner_address: String,
}

fn from_write_resource(
impl Object {
pub fn from_write_resource(
write_resource: &WriteResource,
txn_version: i64,
write_set_change_index: i64,
Expand All @@ -104,7 +85,7 @@ impl Object {
},
CurrentObject {
object_address: resource.address,
owner_address: Some(object_core.get_owner_address()),
owner_address: object_core.get_owner_address(),
state_key_hash: resource.state_key_hash,
allow_ungated_transfer: Some(object_core.allow_ungated_transfer),
last_guid_creation_num: Some(object_core.guid_creation_num.clone()),
Expand All @@ -120,10 +101,12 @@ impl Object {
/// This should never really happen since it's very difficult to delete the entire resource group
/// currently. We actually need a better way of detecting whether an object is deleted since there
/// is likely no delete resource write set change.
fn from_delete_resource(
pub fn from_delete_resource(
delete_resource: &DeleteResource,
txn_version: i64,
write_set_change_index: i64,
object_mapping: &HashMap<CurrentObjectPK, CurrentObject>,
conn: &mut PgPoolConnection,
) -> anyhow::Result<Option<(Self, CurrentObject)>> {
if delete_resource.resource.to_string() == "0x1::object::ObjectGroup" {
let resource = MoveResource::from_delete_resource(
Expand All @@ -132,20 +115,25 @@ impl Object {
txn_version,
0, // Placeholder, this isn't used anyway
);
let owner_address = if let Some(object) = object_mapping.get(&resource.address) {
object.owner_address.clone()
} else {
Self::get_object_owner(conn, &resource.address)?
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: None,
owner_address: Some(owner_address.clone()),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: None,
allow_ungated_transfer: None,
is_deleted: true,
},
CurrentObject {
object_address: resource.address,
owner_address: None,
owner_address,
state_key_hash: resource.state_key_hash,
allow_ungated_transfer: None,
last_guid_creation_num: None,
Expand All @@ -157,4 +145,35 @@ impl Object {
Ok(None)
}
}

/// This is actually not great because object owner can change. The best we can do now though
fn get_object_owner(
conn: &mut PgPoolConnection,
object_address: &str,
) -> anyhow::Result<String> {
let mut retried = 0;
while retried < QUERY_RETRIES {
retried += 1;
match Self::get_by_address(conn, object_address) {
Ok(res) => return Ok(res),
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS));
},
}
}
Err(anyhow::anyhow!("Failed to get object owner"))
}

/// TODO: Change this to a KV store
fn get_by_address(conn: &mut PgPoolConnection, object_address: &str) -> anyhow::Result<String> {
let mut res: Vec<Option<ObjectOwner>> =
sql_query("SELECT owner_address FROM current_objects WHERE object_address = $1")
.bind::<Text, _>(object_address)
.get_results(conn)?;
Ok(res
.pop()
.context("collection result empty")?
.context("collection result null")?
.owner_address)
}
}
51 changes: 46 additions & 5 deletions crates/indexer/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
},
schema,
};
use aptos_api_types::Transaction;
use aptos_api_types::{Transaction, WriteSetChange};
use async_trait::async_trait;
use diesel::{pg::upsert::excluded, result::Error, ExpressionMethods, PgConnection};
use field_count::FieldCount;
Expand Down Expand Up @@ -481,6 +481,8 @@ impl TransactionProcessor for DefaultTransactionProcessor {
start_version: u64,
end_version: u64,
) -> Result<ProcessingResult, TransactionProcessingError> {
let mut conn = self.get_conn();

let (txns, txn_details, events, write_set_changes, wsc_details) =
TransactionModel::from_transactions(&transactions);

Expand Down Expand Up @@ -527,9 +529,49 @@ impl TransactionProcessor for DefaultTransactionProcessor {
let mut all_objects = vec![];
let mut all_current_objects = HashMap::new();
for txn in &transactions {
let (mut objects, current_objects) = Object::from_transaction(txn);
all_objects.append(&mut objects);
all_current_objects.extend(current_objects);
let (changes, txn_version) = match txn {
Transaction::UserTransaction(user_txn) => (
user_txn.info.changes.clone(),
user_txn.info.version.0 as i64,
),
Transaction::BlockMetadataTransaction(bmt_txn) => {
(bmt_txn.info.changes.clone(), bmt_txn.info.version.0 as i64)
},
_ => continue,
};

for (index, wsc) in changes.iter().enumerate() {
let index = index as i64;
match wsc {
WriteSetChange::WriteResource(inner) => {
if let Some((object, current_object)) =
&Object::from_write_resource(inner, txn_version, index).unwrap()
{
all_objects.push(object.clone());
all_current_objects
.insert(object.object_address.clone(), current_object.clone());
}
},
WriteSetChange::DeleteResource(inner) => {
// Passing all_current_objects into the function so that we can get the owner of the deleted
// resource if it was handled in the same batch
if let Some((object, current_object)) = Object::from_delete_resource(
inner,
txn_version,
index,
&all_current_objects,
&mut conn,
)
.unwrap()
{
all_objects.push(object.clone());
all_current_objects
.insert(object.object_address.clone(), current_object.clone());
}
},
_ => {},
}
}
}
// Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes
let mut current_table_items = current_table_items
Expand All @@ -546,7 +588,6 @@ impl TransactionProcessor for DefaultTransactionProcessor {
table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle));
all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address));

let mut conn = self.get_conn();
let tx_result = insert_to_db(
&mut conn,
self.name(),
Expand Down

0 comments on commit 56eecdf

Please sign in to comment.