From fa96bee4f7dda949fa59fdf3591ec7ab945abe0e Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Tue, 18 Jul 2023 13:12:22 -0700 Subject: [PATCH] make changes to old indexer --- .../down.sql | 10 ++ .../up.sql | 20 +++ .../coin_models/account_transactions.rs | 136 +++++++++++++++ crates/indexer/src/models/coin_models/mod.rs | 1 + .../indexer/src/models/user_transactions.rs | 35 ++-- crates/indexer/src/models/v2_objects.rs | 157 +++++++++++------- .../indexer/src/processors/coin_processor.rs | 46 ++++- .../src/processors/default_processor.rs | 51 +++++- crates/indexer/src/schema.rs | 16 +- 9 files changed, 390 insertions(+), 82 deletions(-) create mode 100644 crates/indexer/migrations/2023-07-13-060328_transactions_by_address/down.sql create mode 100644 crates/indexer/migrations/2023-07-13-060328_transactions_by_address/up.sql create mode 100644 crates/indexer/src/models/coin_models/account_transactions.rs diff --git a/crates/indexer/migrations/2023-07-13-060328_transactions_by_address/down.sql b/crates/indexer/migrations/2023-07-13-060328_transactions_by_address/down.sql new file mode 100644 index 00000000000000..4f1714472cfee5 --- /dev/null +++ b/crates/indexer/migrations/2023-07-13-060328_transactions_by_address/down.sql @@ -0,0 +1,10 @@ +-- This file should undo anything in `up.sql` +DROP INDEX IF EXISTS at_version_index; +DROP INDEX IF EXISTS at_insat_index; +DROP TABLE IF EXISTS account_transactions; +ALTER TABLE objects +ALTER COLUMN owner_address DROP NOT NULL; +ALTER TABLE objects +ALTER COLUMN guid_creation_num DROP NOT NULL; +ALTER TABLE objects +ALTER COLUMN allow_ungated_transfer DROP NOT NULL; \ No newline at end of file diff --git a/crates/indexer/migrations/2023-07-13-060328_transactions_by_address/up.sql b/crates/indexer/migrations/2023-07-13-060328_transactions_by_address/up.sql new file mode 100644 index 00000000000000..8672514c8c3775 --- /dev/null +++ b/crates/indexer/migrations/2023-07-13-060328_transactions_by_address/up.sql @@ -0,0 +1,20 @@ +-- Your SQL goes here +-- Records transactions - account pairs. Account here can represent +-- user account, resource account, or object account. +CREATE TABLE IF NOT EXISTS account_transactions ( + transaction_version BIGINT NOT NULL, + account_address VARCHAR(66) NOT NULL, + inserted_at TIMESTAMP NOT NULL DEFAULT NOW(), + PRIMARY KEY (account_address, transaction_version) +); +CREATE INDEX IF NOT EXISTS at_version_index ON account_transactions (transaction_version DESC); +CREATE INDEX IF NOT EXISTS at_insat_index ON account_transactions (inserted_at); +ALTER TABLE objects +ALTER COLUMN owner_address +SET NOT NULL; +ALTER TABLE objects +ALTER COLUMN guid_creation_num +SET NOT NULL; +ALTER TABLE objects +ALTER COLUMN allow_ungated_transfer +SET NOT NULL; \ No newline at end of file diff --git a/crates/indexer/src/models/coin_models/account_transactions.rs b/crates/indexer/src/models/coin_models/account_transactions.rs new file mode 100644 index 00000000000000..cea220f92632f0 --- /dev/null +++ b/crates/indexer/src/models/coin_models/account_transactions.rs @@ -0,0 +1,136 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + models::{ + token_models::v2_token_utils::ObjectWithMetadata, user_transactions::UserTransaction, + }, + schema::account_transactions, + util::standardize_address, +}; +use aptos_api_types::{DeleteResource, Event, Transaction, WriteResource, WriteSetChange}; +use field_count::FieldCount; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +pub type AccountTransactionPK = (String, i64); + +#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[diesel(primary_key(account_address, transaction_version))] +#[diesel(table_name = account_transactions)] +pub struct AccountTransaction { + pub transaction_version: i64, + pub account_address: String, +} + +impl AccountTransaction { + /// This table will record every transaction that touch an account which could be + /// a user account, an object, or a resource account. + /// We will consider all transactions that modify a resource or event associated with a particular account. + /// We will do 1 level of redirection for now (e.g. if it's an object, we will record the owner as account address). + /// We will also consider transactions that the account signed or is part of a multi sig / multi agent. + /// TODO: recursively find the parent account of an object + /// TODO: include table items in the detection path + pub fn from_transaction( + transaction: &Transaction, + ) -> anyhow::Result> { + let (events, wscs, signatures, txn_version) = match transaction { + Transaction::UserTransaction(inner) => ( + &inner.events, + &inner.info.changes, + UserTransaction::get_signatures(inner, inner.info.version.0 as i64, 0), + inner.info.version.0 as i64, + ), + Transaction::GenesisTransaction(inner) => ( + &inner.events, + &inner.info.changes, + vec![], + inner.info.version.0 as i64, + ), + Transaction::BlockMetadataTransaction(inner) => ( + &inner.events, + &inner.info.changes, + vec![], + inner.info.version.0 as i64, + ), + _ => { + return Ok(HashMap::new()); + }, + }; + let mut account_transactions = HashMap::new(); + for sig in &signatures { + account_transactions.insert((sig.signer.clone(), txn_version), Self { + transaction_version: txn_version, + account_address: sig.signer.clone(), + }); + } + for event in events { + account_transactions.extend(Self::from_event(event, txn_version)); + } + for wsc in wscs { + match wsc { + WriteSetChange::DeleteResource(res) => { + account_transactions.extend(Self::from_delete_resource(res, txn_version)?); + }, + WriteSetChange::WriteResource(res) => { + account_transactions.extend(Self::from_write_resource(res, txn_version)?); + }, + _ => {}, + } + } + Ok(account_transactions) + } + + /// Base case, record event account address. We don't really have to worry about + /// objects here because it'll be taken care of in the resource section + fn from_event(event: &Event, txn_version: i64) -> HashMap { + let account_address = standardize_address(&event.guid.account_address.to_string()); + HashMap::from([((account_address.clone(), txn_version), Self { + transaction_version: txn_version, + account_address, + })]) + } + + /// Base case, record resource account. If the resource is an object, then we record the owner as well + /// This handles partial deletes as well + fn from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + ) -> anyhow::Result> { + let mut result = HashMap::new(); + let account_address = standardize_address(&write_resource.address.to_string()); + result.insert((account_address.clone(), txn_version), Self { + transaction_version: txn_version, + account_address, + }); + if let Some(inner) = &ObjectWithMetadata::from_write_resource(write_resource, txn_version)? + { + result.insert((inner.object_core.get_owner_address(), txn_version), Self { + transaction_version: txn_version, + account_address: inner.object_core.get_owner_address(), + }); + } + Ok(result) + } + + /// Base case, record resource account. + /// TODO: If the resource is an object, then we need to look for the latest owner. This isn't really possible + /// right now given we have parallel threads so it'll be very difficult to ensure that we have the correct + /// latest owner + fn from_delete_resource( + delete_resource: &DeleteResource, + txn_version: i64, + ) -> anyhow::Result> { + let mut result = HashMap::new(); + let account_address = standardize_address(&delete_resource.address.to_string()); + result.insert((account_address.clone(), txn_version), Self { + transaction_version: txn_version, + account_address, + }); + Ok(result) + } +} diff --git a/crates/indexer/src/models/coin_models/mod.rs b/crates/indexer/src/models/coin_models/mod.rs index f898fec32f3815..81fcbf9aa0a911 100644 --- a/crates/indexer/src/models/coin_models/mod.rs +++ b/crates/indexer/src/models/coin_models/mod.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +pub mod account_transactions; pub mod coin_activities; pub mod coin_balances; pub mod coin_infos; diff --git a/crates/indexer/src/models/user_transactions.rs b/crates/indexer/src/models/user_transactions.rs index 3cdc3f639b5a32..b5e7f92ba919f7 100644 --- a/crates/indexer/src/models/user_transactions.rs +++ b/crates/indexer/src/models/user_transactions.rs @@ -92,21 +92,30 @@ impl UserTransaction { }, epoch, }, - txn.request - .signature - .as_ref() - .map(|s| { - Signature::from_user_transaction( - s, - &txn.request.sender.to_string(), - version, - block_height, - ) - .unwrap() - }) - .unwrap_or_default(), // empty vec if signature is None + Self::get_signatures(txn, version, block_height), ) } + + /// Empty vec if signature is None + pub fn get_signatures( + txn: &APIUserTransaction, + version: i64, + block_height: i64, + ) -> Vec { + txn.request + .signature + .as_ref() + .map(|s| { + Signature::from_user_transaction( + s, + &txn.request.sender.to_string(), + version, + block_height, + ) + .unwrap() + }) + .unwrap_or_default() // empty vec if signature is None + } } // Prevent conflicts with other things named `Transaction` diff --git a/crates/indexer/src/models/v2_objects.rs b/crates/indexer/src/models/v2_objects.rs index 517450bebfc13c..f6608b0646f3b5 100644 --- a/crates/indexer/src/models/v2_objects.rs +++ b/crates/indexer/src/models/v2_objects.rs @@ -5,13 +5,18 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::token_models::v2_token_utils::ObjectWithMetadata; +use super::token_models::{ + collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, + v2_token_utils::ObjectWithMetadata, +}; use crate::{ + database::PgPoolConnection, models::move_resources::MoveResource, schema::{current_objects, objects}, }; -use aptos_api_types::{DeleteResource, Transaction, WriteResource, WriteSetChange}; +use aptos_api_types::{DeleteResource, WriteResource}; use bigdecimal::BigDecimal; +use diesel::prelude::*; use field_count::FieldCount; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -19,66 +24,49 @@ use std::collections::HashMap; // PK of current_objects, i.e. object_address pub type CurrentObjectPK = String; -#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, write_set_change_index))] #[diesel(table_name = objects)] pub struct Object { pub transaction_version: i64, pub write_set_change_index: i64, pub object_address: String, - pub owner_address: Option, + pub owner_address: String, pub state_key_hash: String, - pub guid_creation_num: Option, - pub allow_ungated_transfer: Option, + pub guid_creation_num: BigDecimal, + pub allow_ungated_transfer: bool, pub is_deleted: bool, } -#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(object_address))] #[diesel(table_name = current_objects)] pub struct CurrentObject { pub object_address: String, - pub owner_address: Option, + pub owner_address: String, pub state_key_hash: String, - pub allow_ungated_transfer: Option, - pub last_guid_creation_num: Option, + pub allow_ungated_transfer: bool, + pub last_guid_creation_num: BigDecimal, pub last_transaction_version: i64, pub is_deleted: bool, } -impl Object { - /// Only parsing 0x1 ObjectCore from transactions - pub fn from_transaction( - transaction: &Transaction, - ) -> (Vec, HashMap) { - if let Transaction::UserTransaction(user_txn) = transaction { - let mut objects = vec![]; - let mut current_objects: HashMap = HashMap::new(); - let txn_version = user_txn.info.version.0 as i64; - - for (index, wsc) in user_txn.info.changes.iter().enumerate() { - let index = index as i64; - let maybe_object_combo = match wsc { - WriteSetChange::DeleteResource(inner) => { - Self::from_delete_resource(inner, txn_version, index).unwrap() - }, - WriteSetChange::WriteResource(inner) => { - Self::from_write_resource(inner, txn_version, index).unwrap() - }, - _ => None, - }; - if let Some((object, current_object)) = maybe_object_combo { - objects.push(object); - current_objects.insert(current_object.object_address.clone(), current_object); - } - } - (objects, current_objects) - } else { - Default::default() - } - } +#[derive(Debug, Deserialize, Identifiable, Queryable, Serialize)] +#[diesel(primary_key(object_address))] +#[diesel(table_name = current_objects)] +pub struct CurrentObjectQuery { + pub object_address: String, + pub owner_address: String, + pub state_key_hash: String, + pub allow_ungated_transfer: bool, + pub last_guid_creation_num: BigDecimal, + pub last_transaction_version: i64, + pub is_deleted: bool, + pub inserted_at: chrono::NaiveDateTime, +} - fn from_write_resource( +impl Object { + pub fn from_write_resource( write_resource: &WriteResource, txn_version: i64, write_set_change_index: i64, @@ -96,18 +84,18 @@ impl Object { transaction_version: txn_version, write_set_change_index, object_address: resource.address.clone(), - owner_address: Some(object_core.get_owner_address()), + owner_address: object_core.get_owner_address(), state_key_hash: resource.state_key_hash.clone(), - guid_creation_num: Some(object_core.guid_creation_num.clone()), - allow_ungated_transfer: Some(object_core.allow_ungated_transfer), + guid_creation_num: object_core.guid_creation_num.clone(), + allow_ungated_transfer: object_core.allow_ungated_transfer, is_deleted: false, }, CurrentObject { object_address: resource.address, - owner_address: Some(object_core.get_owner_address()), + owner_address: object_core.get_owner_address(), state_key_hash: resource.state_key_hash, - allow_ungated_transfer: Some(object_core.allow_ungated_transfer), - last_guid_creation_num: Some(object_core.guid_creation_num.clone()), + allow_ungated_transfer: object_core.allow_ungated_transfer, + last_guid_creation_num: object_core.guid_creation_num.clone(), last_transaction_version: txn_version, is_deleted: false, }, @@ -120,35 +108,52 @@ impl Object { /// This should never really happen since it's very difficult to delete the entire resource group /// currently. We actually need a better way of detecting whether an object is deleted since there /// is likely no delete resource write set change. - fn from_delete_resource( + pub fn from_delete_resource( delete_resource: &DeleteResource, txn_version: i64, write_set_change_index: i64, + object_mapping: &HashMap, + conn: &mut PgPoolConnection, ) -> anyhow::Result> { - if delete_resource.resource.to_string() == "0x1::object::ObjectCore" { + if delete_resource.resource.to_string() == "0x1::object::ObjectGroup" { let resource = MoveResource::from_delete_resource( delete_resource, 0, // Placeholder, this isn't used anyway txn_version, 0, // Placeholder, this isn't used anyway ); + let previous_object = if let Some(object) = object_mapping.get(&resource.address) { + object.clone() + } else { + match Self::get_object_owner(conn, &resource.address) { + Ok(owner) => owner, + Err(_) => { + aptos_logger::error!( + transaction_version = txn_version, + lookup_key = &resource.address, + "Missing object owner for object. You probably should backfill db.", + ); + return Ok(None); + }, + } + }; Ok(Some(( Self { transaction_version: txn_version, write_set_change_index, object_address: resource.address.clone(), - owner_address: None, + owner_address: previous_object.owner_address.clone(), state_key_hash: resource.state_key_hash.clone(), - guid_creation_num: None, - allow_ungated_transfer: None, + guid_creation_num: previous_object.last_guid_creation_num.clone(), + allow_ungated_transfer: previous_object.allow_ungated_transfer, is_deleted: true, }, CurrentObject { object_address: resource.address, - owner_address: None, + owner_address: previous_object.owner_address.clone(), state_key_hash: resource.state_key_hash, - allow_ungated_transfer: None, - last_guid_creation_num: None, + last_guid_creation_num: previous_object.last_guid_creation_num.clone(), + allow_ungated_transfer: previous_object.allow_ungated_transfer, last_transaction_version: txn_version, is_deleted: true, }, @@ -157,4 +162,44 @@ impl Object { Ok(None) } } + + /// This is actually not great because object owner can change. The best we can do now though + fn get_object_owner( + conn: &mut PgPoolConnection, + object_address: &str, + ) -> anyhow::Result { + let mut retried = 0; + while retried < QUERY_RETRIES { + retried += 1; + match CurrentObjectQuery::get_by_address(object_address, conn) { + Ok(res) => { + return Ok(CurrentObject { + object_address: res.object_address, + owner_address: res.owner_address, + state_key_hash: res.state_key_hash, + allow_ungated_transfer: res.allow_ungated_transfer, + last_guid_creation_num: res.last_guid_creation_num, + last_transaction_version: res.last_transaction_version, + is_deleted: res.is_deleted, + }) + }, + Err(_) => { + std::thread::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)); + }, + } + } + Err(anyhow::anyhow!("Failed to get object owner")) + } +} + +impl CurrentObjectQuery { + /// TODO: Change this to a KV store + pub fn get_by_address( + object_address: &str, + conn: &mut PgPoolConnection, + ) -> diesel::QueryResult { + current_objects::table + .filter(current_objects::object_address.eq(object_address)) + .first::(conn) + } } diff --git a/crates/indexer/src/processors/coin_processor.rs b/crates/indexer/src/processors/coin_processor.rs index 3796b05d10843d..9c65d42271388e 100644 --- a/crates/indexer/src/processors/coin_processor.rs +++ b/crates/indexer/src/processors/coin_processor.rs @@ -10,6 +10,7 @@ use crate::{ transaction_processor::TransactionProcessor, }, models::coin_models::{ + account_transactions::AccountTransaction, coin_activities::{CoinActivity, CurrentCoinBalancePK}, coin_balances::{CoinBalance, CurrentCoinBalance}, coin_infos::{CoinInfo, CoinInfoQuery}, @@ -53,12 +54,14 @@ fn insert_to_db_impl( coin_balances: &[CoinBalance], current_coin_balances: &[CurrentCoinBalance], coin_supply: &[CoinSupply], + account_transactions: &[AccountTransaction], ) -> Result<(), diesel::result::Error> { insert_coin_activities(conn, coin_activities)?; insert_coin_infos(conn, coin_infos)?; insert_coin_balances(conn, coin_balances)?; insert_current_coin_balances(conn, current_coin_balances)?; insert_coin_supply(conn, coin_supply)?; + insert_account_transactions(conn, account_transactions)?; Ok(()) } @@ -72,6 +75,7 @@ fn insert_to_db( coin_balances: Vec, current_coin_balances: Vec, coin_supply: Vec, + account_transactions: Vec, ) -> Result<(), diesel::result::Error> { aptos_logger::trace!( name = name, @@ -90,6 +94,7 @@ fn insert_to_db( &coin_balances, ¤t_coin_balances, &coin_supply, + &account_transactions, ) }) { Ok(_) => Ok(()), @@ -101,6 +106,8 @@ fn insert_to_db( let coin_infos = clean_data_for_db(coin_infos, true); let coin_balances = clean_data_for_db(coin_balances, true); let current_coin_balances = clean_data_for_db(current_coin_balances, true); + let coin_supply = clean_data_for_db(coin_supply, true); + let account_transactions = clean_data_for_db(account_transactions, true); insert_to_db_impl( pg_conn, @@ -109,6 +116,7 @@ fn insert_to_db( &coin_balances, ¤t_coin_balances, &coin_supply, + &account_transactions, ) }), } @@ -132,11 +140,7 @@ fn insert_coin_activities( event_creation_number, event_sequence_number, )) - .do_update() - .set(( - inserted_at.eq(excluded(inserted_at)), - event_index.eq(excluded(event_index)), - )), + .do_nothing(), None, )?; } @@ -240,6 +244,26 @@ fn insert_coin_supply( Ok(()) } +fn insert_account_transactions( + conn: &mut PgConnection, + item_to_insert: &[AccountTransaction], +) -> Result<(), diesel::result::Error> { + use schema::account_transactions::dsl::*; + + let chunks = get_chunks(item_to_insert.len(), AccountTransaction::field_count()); + for (start_ind, end_ind) in chunks { + execute_with_better_error( + conn, + diesel::insert_into(schema::account_transactions::table) + .values(&item_to_insert[start_ind..end_ind]) + .on_conflict((transaction_version, account_address)) + .do_nothing(), + None, + )?; + } + Ok(()) +} + #[async_trait] impl TransactionProcessor for CoinTransactionProcessor { fn name(&self) -> &'static str { @@ -265,6 +289,8 @@ impl TransactionProcessor for CoinTransactionProcessor { HashMap::new(); let mut all_coin_supply = vec![]; + let mut account_transactions = HashMap::new(); + for txn in &transactions { let ( mut coin_activities, @@ -281,17 +307,26 @@ impl TransactionProcessor for CoinTransactionProcessor { all_coin_infos.entry(key).or_insert(value); } all_current_coin_balances.extend(current_coin_balances); + + account_transactions.extend(AccountTransaction::from_transaction(txn).unwrap()); } let mut all_coin_infos = all_coin_infos.into_values().collect::>(); let mut all_current_coin_balances = all_current_coin_balances .into_values() .collect::>(); + let mut account_transactions = account_transactions + .into_values() + .collect::>(); // Sort by PK all_coin_infos.sort_by(|a, b| a.coin_type.cmp(&b.coin_type)); all_current_coin_balances.sort_by(|a, b| { (&a.owner_address, &a.coin_type).cmp(&(&b.owner_address, &b.coin_type)) }); + account_transactions.sort_by(|a, b| { + (&a.transaction_version, &a.account_address) + .cmp(&(&b.transaction_version, &b.account_address)) + }); let tx_result = insert_to_db( &mut conn, @@ -303,6 +338,7 @@ impl TransactionProcessor for CoinTransactionProcessor { all_coin_balances, all_current_coin_balances, all_coin_supply, + account_transactions, ); match tx_result { Ok(_) => Ok(ProcessingResult::new( diff --git a/crates/indexer/src/processors/default_processor.rs b/crates/indexer/src/processors/default_processor.rs index dd9294e4ad6990..2a65adb6448214 100644 --- a/crates/indexer/src/processors/default_processor.rs +++ b/crates/indexer/src/processors/default_processor.rs @@ -23,7 +23,7 @@ use crate::{ }, schema, }; -use aptos_api_types::Transaction; +use aptos_api_types::{Transaction, WriteSetChange}; use async_trait::async_trait; use diesel::{pg::upsert::excluded, result::Error, ExpressionMethods, PgConnection}; use field_count::FieldCount; @@ -481,6 +481,8 @@ impl TransactionProcessor for DefaultTransactionProcessor { start_version: u64, end_version: u64, ) -> Result { + let mut conn = self.get_conn(); + let (txns, txn_details, events, write_set_changes, wsc_details) = TransactionModel::from_transactions(&transactions); @@ -527,9 +529,49 @@ impl TransactionProcessor for DefaultTransactionProcessor { let mut all_objects = vec![]; let mut all_current_objects = HashMap::new(); for txn in &transactions { - let (mut objects, current_objects) = Object::from_transaction(txn); - all_objects.append(&mut objects); - all_current_objects.extend(current_objects); + let (changes, txn_version) = match txn { + Transaction::UserTransaction(user_txn) => ( + user_txn.info.changes.clone(), + user_txn.info.version.0 as i64, + ), + Transaction::BlockMetadataTransaction(bmt_txn) => { + (bmt_txn.info.changes.clone(), bmt_txn.info.version.0 as i64) + }, + _ => continue, + }; + + for (index, wsc) in changes.iter().enumerate() { + let index = index as i64; + match wsc { + WriteSetChange::WriteResource(inner) => { + if let Some((object, current_object)) = + &Object::from_write_resource(inner, txn_version, index).unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + WriteSetChange::DeleteResource(inner) => { + // Passing all_current_objects into the function so that we can get the owner of the deleted + // resource if it was handled in the same batch + if let Some((object, current_object)) = Object::from_delete_resource( + inner, + txn_version, + index, + &all_current_objects, + &mut conn, + ) + .unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + _ => {}, + } + } } // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes let mut current_table_items = current_table_items @@ -546,7 +588,6 @@ impl TransactionProcessor for DefaultTransactionProcessor { table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); - let mut conn = self.get_conn(); let tx_result = insert_to_db( &mut conn, self.name(), diff --git a/crates/indexer/src/schema.rs b/crates/indexer/src/schema.rs index f5d9ab795ac080..428cd26be9aba6 100644 --- a/crates/indexer/src/schema.rs +++ b/crates/indexer/src/schema.rs @@ -2,6 +2,15 @@ // @generated automatically by Diesel CLI. +diesel::table! { + account_transactions (account_address, transaction_version) { + transaction_version -> Int8, + #[max_length = 66] + account_address -> Varchar, + inserted_at -> Timestamp, + } +} + diesel::table! { block_metadata_transactions (version) { version -> Int8, @@ -583,11 +592,11 @@ diesel::table! { #[max_length = 66] object_address -> Varchar, #[max_length = 66] - owner_address -> Nullable, + owner_address -> Varchar, #[max_length = 66] state_key_hash -> Varchar, - guid_creation_num -> Nullable, - allow_ungated_transfer -> Nullable, + guid_creation_num -> Numeric, + allow_ungated_transfer -> Bool, is_deleted -> Bool, inserted_at -> Timestamp, } @@ -927,6 +936,7 @@ diesel::table! { } diesel::allow_tables_to_appear_in_same_query!( + account_transactions, block_metadata_transactions, coin_activities, coin_balances,