Skip to content

Commit

Permalink
[parquet-sdk]migrate objects [pt1]
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Dec 11, 2024
1 parent 2cf9dfd commit 70d8577
Show file tree
Hide file tree
Showing 29 changed files with 507 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Chan
use diesel::{Identifiable, Insertable, Queryable};
use field_count::FieldCount;
use processor::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
db::{
common::models::object_models::v2_object_utils::ObjectWithMetadata,
postgres::models::{
resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
},
schema::account_transactions,
utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
#![allow(clippy::unused_unit)]

use crate::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
db::{
common::models::object_models::v2_object_utils::ObjectWithMetadata,
postgres::models::{
resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
},
utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@

use crate::{
db::{
common::models::token_v2_models::v2_token_utils::TokenStandard,
common::models::{
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::TokenStandard,
},
postgres::models::{
coin_models::{
coin_activities::CoinActivity,
coin_utils::{CoinEvent, CoinInfoType, EventGuidResource},
},
fungible_asset_models::v2_fungible_asset_utils::{FeeStatement, FungibleAssetEvent},
object_models::v2_object_utils::ObjectAggregatedDataMapping,
},
},
utils::util::standardize_address,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use crate::{
db::{
common::models::{
fungible_asset_models::raw_v2_fungible_asset_activities::EventToCoinType,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::{TokenStandard, V2_STANDARD},
},
postgres::models::{
coin_models::coin_utils::{CoinInfoType, CoinResource},
fungible_asset_models::v2_fungible_asset_utils::FungibleAssetStore,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
resources::FromWriteResource,
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

use crate::{
db::{
common::models::token_v2_models::v2_token_utils::TokenStandard,
common::models::{
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::TokenStandard,
},
postgres::models::{
coin_models::coin_utils::{CoinInfoType, CoinResource},
fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
resources::FromWriteResource,
},
},
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pub mod ans_models;
pub mod default_models;
pub mod event_models;
pub mod fungible_asset_models;
pub mod object_models;
pub mod token_v2_models;
5 changes: 5 additions & 0 deletions rust/processor/src/db/common/models/object_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod v2_object_utils;
pub mod v2_objects;
238 changes: 238 additions & 0 deletions rust/processor/src/db/common/models/object_models/v2_objects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping};
use crate::{
db::postgres::models::default_models::move_resources::MoveResource,
schema::current_objects,
utils::{database::DbPoolConnection, util::standardize_address},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{DeleteResource, WriteResource};
use bigdecimal::BigDecimal;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]
pub struct RawObject {
pub transaction_version: i64,
pub write_set_change_index: i64,
pub object_address: String,
pub owner_address: String,
pub state_key_hash: String,
pub guid_creation_num: BigDecimal,
pub allow_ungated_transfer: bool,
pub is_deleted: bool,
pub untransferrable: bool,
}

pub trait ObjectConvertible {
fn from_raw(raw_item: RawObject) -> Self;
}

#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]
pub struct RawCurrentObject {
pub object_address: String,
pub owner_address: String,
pub state_key_hash: String,
pub allow_ungated_transfer: bool,
pub last_guid_creation_num: BigDecimal,
pub last_transaction_version: i64,
pub is_deleted: bool,
pub untransferrable: bool,
}

pub trait CurrentObjectConvertible {
fn from_raw(raw_item: RawCurrentObject) -> Self;
}

#[derive(Debug, Deserialize, Identifiable, Queryable, Serialize)]
#[diesel(primary_key(object_address))]
#[diesel(table_name = current_objects)]
pub struct CurrentObjectQuery {
pub object_address: String,
pub owner_address: String,
pub state_key_hash: String,
pub allow_ungated_transfer: bool,
pub last_guid_creation_num: BigDecimal,
pub last_transaction_version: i64,
pub is_deleted: bool,
pub inserted_at: chrono::NaiveDateTime,
pub untransferrable: bool,
}

impl RawObject {
pub fn from_write_resource(
write_resource: &WriteResource,
txn_version: i64,
write_set_change_index: i64,
object_metadata_mapping: &ObjectAggregatedDataMapping,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
let address = standardize_address(&write_resource.address.to_string());
println!("address: {:?}", address);
if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) {
println!(
"object_aggregated_metadata: {:?}",
object_aggregated_metadata
);
// do something
let object_with_metadata = object_aggregated_metadata.object.clone();
let object_core = object_with_metadata.object_core;

let untransferrable = if object_aggregated_metadata.untransferable.as_ref().is_some() {
true
} else {
!object_core.allow_ungated_transfer
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: address.clone(),
owner_address: object_core.get_owner_address(),
state_key_hash: object_with_metadata.state_key_hash.clone(),
guid_creation_num: object_core.guid_creation_num.clone(),
allow_ungated_transfer: object_core.allow_ungated_transfer,
is_deleted: false,
untransferrable,
},
RawCurrentObject {
object_address: address,
owner_address: object_core.get_owner_address(),
state_key_hash: object_with_metadata.state_key_hash,
allow_ungated_transfer: object_core.allow_ungated_transfer,
last_guid_creation_num: object_core.guid_creation_num.clone(),
last_transaction_version: txn_version,
is_deleted: false,
untransferrable,
},
)))
} else {
Ok(None)
}
}

/// This handles the case where the entire object is deleted
/// TODO: We need to detect if an object is only partially deleted
/// using KV store
pub async fn from_delete_resource(
delete_resource: &DeleteResource,
txn_version: i64,
write_set_change_index: i64,
object_mapping: &AHashMap<CurrentObjectPK, RawCurrentObject>,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
if delete_resource.type_str == "0x1::object::ObjectGroup" {
let resource = MoveResource::from_delete_resource(
delete_resource,
0, // Placeholder, this isn't used anyway
txn_version,
0, // Placeholder, this isn't used anyway
);
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
} else {
match Self::get_current_object(
conn,
&resource.address,
query_retries,
query_retry_delay_ms,
)
.await
{
Ok(object) => object,
Err(_) => {
tracing::error!(
transaction_version = txn_version,
lookup_key = &resource.address,
"Missing current_object for object_address: {}. You probably should backfill db.",
resource.address,
);
return Ok(None);
},
}
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
RawCurrentObject {
object_address: resource.address,
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash,
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
)))
} else {
Ok(None)
}
}

/// This is actually not great because object owner can change. The best we can do now though.
/// This will loop forever until we get the object from the db
pub async fn get_current_object(
conn: &mut DbPoolConnection<'_>,
object_address: &str,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<RawCurrentObject> {
let mut tried = 0;
while tried < query_retries {
tried += 1;
match CurrentObjectQuery::get_by_address(object_address, conn).await {
Ok(res) => {
return Ok(RawCurrentObject {
object_address: res.object_address,
owner_address: res.owner_address,
state_key_hash: res.state_key_hash,
allow_ungated_transfer: res.allow_ungated_transfer,
last_guid_creation_num: res.last_guid_creation_num,
last_transaction_version: res.last_transaction_version,
is_deleted: res.is_deleted,
untransferrable: res.untransferrable,
});
},
Err(_) => {
if tried < query_retries {
tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms))
.await;
}
},
}
}
Err(anyhow::anyhow!("Failed to get object owner"))
}
}

impl CurrentObjectQuery {
/// TODO: Change this to a KV store
pub async fn get_by_address(
object_address: &str,
conn: &mut DbPoolConnection<'_>,
) -> diesel::QueryResult<Self> {
current_objects::table
.filter(current_objects::object_address.eq(object_address))
.first::<Self>(conn)
.await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

use crate::{
db::{
common::models::token_v2_models::{
raw_token_claims::TokenV1Claimed,
v2_token_utils::{TokenStandard, V2TokenEvent},
},
postgres::models::{
common::models::{
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_models::token_utils::{TokenDataIdType, TokenEvent},
token_v2_models::{
raw_token_claims::TokenV1Claimed,
v2_token_utils::{TokenStandard, V2TokenEvent},
},
},
postgres::models::token_models::token_utils::{TokenDataIdType, TokenEvent},
},
utils::util::standardize_address,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

use crate::{
db::{
common::models::token_v2_models::v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned},
postgres::models::{
common::models::{
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned},
},
postgres::models::{
resources::FromWriteResource, token_models::token_utils::TokenWriteSet,
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
#![allow(clippy::unused_unit)]

use crate::{
db::postgres::models::{
default_models::move_resources::MoveResource,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
resources::{COIN_ADDR, TOKEN_ADDR, TOKEN_V2_ADDR},
token_models::token_utils::NAME_LENGTH,
db::{
common::models::object_models::v2_object_utils::ObjectAggregatedDataMapping,
postgres::models::{
default_models::move_resources::MoveResource,
resources::{COIN_ADDR, TOKEN_ADDR, TOKEN_V2_ADDR},
token_models::token_utils::NAME_LENGTH,
},
},
utils::util::{standardize_address, truncate_str},
};
Expand Down
Loading

0 comments on commit 70d8577

Please sign in to comment.