Skip to content

Commit

Permalink
[parquet-sdk]migrate objects [pt1]
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Dec 11, 2024
1 parent 6bfefc8 commit 5d38eb5
Show file tree
Hide file tree
Showing 29 changed files with 507 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Chan
use diesel::{Identifiable, Insertable, Queryable};
use field_count::FieldCount;
use processor::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
db::{
common::models::object_models::v2_object_utils::ObjectWithMetadata,
postgres::models::{
resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
},
schema::account_transactions,
utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
#![allow(clippy::unused_unit)]

use crate::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
db::{
common::models::object_models::v2_object_utils::ObjectWithMetadata,
postgres::models::{
resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
},
utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@

use crate::{
db::{
common::models::token_v2_models::v2_token_utils::TokenStandard,
common::models::{
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::TokenStandard,
},
postgres::models::{
coin_models::{
coin_activities::CoinActivity,
coin_utils::{CoinEvent, CoinInfoType, EventGuidResource},
},
fungible_asset_models::v2_fungible_asset_utils::{FeeStatement, FungibleAssetEvent},
object_models::v2_object_utils::ObjectAggregatedDataMapping,
},
},
utils::util::standardize_address,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use crate::{
db::{
common::models::{
fungible_asset_models::raw_v2_fungible_asset_activities::EventToCoinType,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::{TokenStandard, V2_STANDARD},
},
postgres::models::{
coin_models::coin_utils::{CoinInfoType, CoinResource},
fungible_asset_models::v2_fungible_asset_utils::FungibleAssetStore,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
resources::FromWriteResource,
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

use crate::{
db::{
common::models::token_v2_models::v2_token_utils::TokenStandard,
common::models::{
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::TokenStandard,
},
postgres::models::{
coin_models::coin_utils::{CoinInfoType, CoinResource},
fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
resources::FromWriteResource,
},
},
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pub mod ans_models;
pub mod default_models;
pub mod event_models;
pub mod fungible_asset_models;
pub mod object_models;
pub mod token_v2_models;
5 changes: 5 additions & 0 deletions rust/processor/src/db/common/models/object_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod v2_object_utils;
pub mod v2_objects;
238 changes: 238 additions & 0 deletions rust/processor/src/db/common/models/object_models/v2_objects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping};
use crate::{
db::postgres::models::default_models::move_resources::MoveResource,
schema::current_objects,
utils::{database::DbPoolConnection, util::standardize_address},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{DeleteResource, WriteResource};
use bigdecimal::BigDecimal;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]

Check warning on line 22 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L22

Added line #L22 was not covered by tests
pub struct RawObject {
pub transaction_version: i64,
pub write_set_change_index: i64,
pub object_address: String,
pub owner_address: String,
pub state_key_hash: String,
pub guid_creation_num: BigDecimal,
pub allow_ungated_transfer: bool,
pub is_deleted: bool,
pub untransferrable: bool,
}

pub trait ObjectConvertible {
fn from_raw(raw_item: RawObject) -> Self;
}

#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]

Check warning on line 39 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L39

Added line #L39 was not covered by tests
pub struct RawCurrentObject {
pub object_address: String,
pub owner_address: String,
pub state_key_hash: String,
pub allow_ungated_transfer: bool,
pub last_guid_creation_num: BigDecimal,
pub last_transaction_version: i64,
pub is_deleted: bool,
pub untransferrable: bool,
}

pub trait CurrentObjectConvertible {
fn from_raw(raw_item: RawCurrentObject) -> Self;
}

#[derive(Debug, Deserialize, Identifiable, Queryable, Serialize)]

Check warning on line 55 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L55

Added line #L55 was not covered by tests
#[diesel(primary_key(object_address))]
#[diesel(table_name = current_objects)]
pub struct CurrentObjectQuery {
pub object_address: String,
pub owner_address: String,
pub state_key_hash: String,
pub allow_ungated_transfer: bool,
pub last_guid_creation_num: BigDecimal,
pub last_transaction_version: i64,
pub is_deleted: bool,
pub inserted_at: chrono::NaiveDateTime,
pub untransferrable: bool,
}

impl RawObject {
pub fn from_write_resource(
write_resource: &WriteResource,
txn_version: i64,
write_set_change_index: i64,
object_metadata_mapping: &ObjectAggregatedDataMapping,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
let address = standardize_address(&write_resource.address.to_string());
println!("address: {:?}", address);
if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) {
println!(
"object_aggregated_metadata: {:?}",
object_aggregated_metadata
);
// do something
let object_with_metadata = object_aggregated_metadata.object.clone();
let object_core = object_with_metadata.object_core;

let untransferrable = if object_aggregated_metadata.untransferable.as_ref().is_some() {
true

Check warning on line 89 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L89

Added line #L89 was not covered by tests
} else {
!object_core.allow_ungated_transfer
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: address.clone(),
owner_address: object_core.get_owner_address(),
state_key_hash: object_with_metadata.state_key_hash.clone(),
guid_creation_num: object_core.guid_creation_num.clone(),
allow_ungated_transfer: object_core.allow_ungated_transfer,
is_deleted: false,
untransferrable,
},
RawCurrentObject {
object_address: address,
owner_address: object_core.get_owner_address(),
state_key_hash: object_with_metadata.state_key_hash,
allow_ungated_transfer: object_core.allow_ungated_transfer,
last_guid_creation_num: object_core.guid_creation_num.clone(),
last_transaction_version: txn_version,
is_deleted: false,
untransferrable,
},
)))
} else {
Ok(None)
}
}

/// This handles the case where the entire object is deleted
/// TODO: We need to detect if an object is only partially deleted
/// using KV store
pub async fn from_delete_resource(
delete_resource: &DeleteResource,
txn_version: i64,
write_set_change_index: i64,
object_mapping: &AHashMap<CurrentObjectPK, RawCurrentObject>,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
if delete_resource.type_str == "0x1::object::ObjectGroup" {
let resource = MoveResource::from_delete_resource(
delete_resource,
0, // Placeholder, this isn't used anyway
txn_version,
0, // Placeholder, this isn't used anyway
);
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
} else {
match Self::get_current_object(
conn,
&resource.address,
query_retries,
query_retry_delay_ms,
)
.await
{
Ok(object) => object,

Check warning on line 151 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L151

Added line #L151 was not covered by tests
Err(_) => {
tracing::error!(
transaction_version = txn_version,
lookup_key = &resource.address,
"Missing current_object for object_address: {}. You probably should backfill db.",

Check warning on line 156 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L155-L156

Added lines #L155 - L156 were not covered by tests
resource.address,
);
return Ok(None);
},
}
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
RawCurrentObject {
object_address: resource.address,
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash,
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
)))
} else {
Ok(None)

Check warning on line 187 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L187

Added line #L187 was not covered by tests
}
}

/// This is actually not great because object owner can change. The best we can do now though.
/// This will loop forever until we get the object from the db
pub async fn get_current_object(
conn: &mut DbPoolConnection<'_>,
object_address: &str,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<RawCurrentObject> {
let mut tried = 0;
while tried < query_retries {
tried += 1;
match CurrentObjectQuery::get_by_address(object_address, conn).await {
Ok(res) => {
return Ok(RawCurrentObject {
object_address: res.object_address,
owner_address: res.owner_address,
state_key_hash: res.state_key_hash,
allow_ungated_transfer: res.allow_ungated_transfer,
last_guid_creation_num: res.last_guid_creation_num,
last_transaction_version: res.last_transaction_version,
is_deleted: res.is_deleted,
untransferrable: res.untransferrable,
});

Check warning on line 213 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L203-L213

Added lines #L203 - L213 were not covered by tests
},
Err(_) => {
if tried < query_retries {
tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms))
.await;

Check warning on line 218 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L217-L218

Added lines #L217 - L218 were not covered by tests
}
},
}
}
Err(anyhow::anyhow!("Failed to get object owner"))
}
}

impl CurrentObjectQuery {
/// TODO: Change this to a KV store
pub async fn get_by_address(
object_address: &str,
conn: &mut DbPoolConnection<'_>,
) -> diesel::QueryResult<Self> {
current_objects::table
.filter(current_objects::object_address.eq(object_address))
.first::<Self>(conn)
.await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

use crate::{
db::{
common::models::token_v2_models::{
raw_token_claims::TokenV1Claimed,
v2_token_utils::{TokenStandard, V2TokenEvent},
},
postgres::models::{
common::models::{
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_models::token_utils::{TokenDataIdType, TokenEvent},
token_v2_models::{
raw_token_claims::TokenV1Claimed,
v2_token_utils::{TokenStandard, V2TokenEvent},
},
},
postgres::models::token_models::token_utils::{TokenDataIdType, TokenEvent},
},
utils::util::standardize_address,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

use crate::{
db::{
common::models::token_v2_models::v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned},
postgres::models::{
common::models::{
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned},
},
postgres::models::{
resources::FromWriteResource, token_models::token_utils::TokenWriteSet,
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
#![allow(clippy::unused_unit)]

use crate::{
db::postgres::models::{
default_models::move_resources::MoveResource,
object_models::v2_object_utils::ObjectAggregatedDataMapping,
resources::{COIN_ADDR, TOKEN_ADDR, TOKEN_V2_ADDR},
token_models::token_utils::NAME_LENGTH,
db::{
common::models::object_models::v2_object_utils::ObjectAggregatedDataMapping,
postgres::models::{
default_models::move_resources::MoveResource,
resources::{COIN_ADDR, TOKEN_ADDR, TOKEN_V2_ADDR},
token_models::token_utils::NAME_LENGTH,
},
},
utils::util::{standardize_address, truncate_str},
};
Expand Down
Loading

0 comments on commit 5d38eb5

Please sign in to comment.