Skip to content

Commit

Permalink
[parquet-sdk]migrate objects [pt2]
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Dec 18, 2024
1 parent fdfb3a3 commit a0c8208
Show file tree
Hide file tree
Showing 19 changed files with 669 additions and 360 deletions.
140 changes: 92 additions & 48 deletions rust/processor/src/db/common/models/object_models/raw_v2_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping};
use crate::{
db::postgres::models::default_models::move_resources::MoveResource,
schema::current_objects,
utils::{database::DbPoolConnection, util::standardize_address},
utils::{
database::{DbContext, DbPoolConnection},
util::standardize_address,
},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{DeleteResource, WriteResource};
Expand All @@ -19,6 +22,8 @@ use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

const DELETED_RESOURCE_OWNER_ADDRESS: &str = "Unknown";

#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]
pub struct RawObject {
pub transaction_version: i64,
Expand All @@ -30,6 +35,7 @@ pub struct RawObject {
pub allow_ungated_transfer: bool,
pub is_deleted: bool,
pub untransferrable: bool,
pub block_timestamp: chrono::NaiveDateTime,
}

pub trait ObjectConvertible {
Expand All @@ -46,6 +52,7 @@ pub struct RawCurrentObject {
pub last_transaction_version: i64,
pub is_deleted: bool,
pub untransferrable: bool,
pub block_timestamp: chrono::NaiveDateTime,
}

pub trait CurrentObjectConvertible {
Expand Down Expand Up @@ -73,14 +80,10 @@ impl RawObject {
txn_version: i64,
write_set_change_index: i64,
object_metadata_mapping: &ObjectAggregatedDataMapping,
block_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
let address = standardize_address(&write_resource.address.to_string());
println!("address: {:?}", address);
if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) {
println!(
"object_aggregated_metadata: {:?}",
object_aggregated_metadata
);
// do something
let object_with_metadata = object_aggregated_metadata.object.clone();
let object_core = object_with_metadata.object_core;
Expand All @@ -101,6 +104,7 @@ impl RawObject {
allow_ungated_transfer: object_core.allow_ungated_transfer,
is_deleted: false,
untransferrable,
block_timestamp,
},
RawCurrentObject {
object_address: address,
Expand All @@ -111,6 +115,7 @@ impl RawObject {
last_transaction_version: txn_version,
is_deleted: false,
untransferrable,
block_timestamp,
},
)))
} else {
Expand All @@ -126,9 +131,8 @@ impl RawObject {
txn_version: i64,
write_set_change_index: i64,
object_mapping: &AHashMap<CurrentObjectPK, RawCurrentObject>,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
db_context: &mut Option<DbContext<'_>>,
block_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
if delete_resource.type_str == "0x1::object::ObjectGroup" {
let resource = MoveResource::from_delete_resource(
Expand All @@ -137,52 +141,91 @@ impl RawObject {
txn_version,
0, // Placeholder, this isn't used anyway
);
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
// Add a logid here to handle None conn
if db_context.is_none() {
// This is a hack to prevent the program for parquet
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: BigDecimal::default(),
allow_ungated_transfer: false,
is_deleted: true,
untransferrable: false,
block_timestamp,
},
RawCurrentObject {
object_address: resource.address.clone(),
owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(),
state_key_hash: resource.state_key_hash.clone(),
last_guid_creation_num: BigDecimal::default(),
allow_ungated_transfer: false,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: false,
block_timestamp,
},
)))

Check warning on line 171 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L147-L171

Added lines #L147 - L171 were not covered by tests
} else {
match Self::get_current_object(
conn,
&resource.address,
query_retries,
query_retry_delay_ms,
)
.await
{
Ok(object) => object,
Err(_) => {
tracing::error!(
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
} else if let Some(db_context) = db_context {
match Self::get_current_object(
&mut db_context.conn,
&resource.address,
db_context.query_retries,
db_context.query_retry_delay_ms,
)
.await
{
Ok(object) => object,

Check warning on line 184 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L184

Added line #L184 was not covered by tests
Err(_) => {
tracing::error!(
transaction_version = txn_version,
lookup_key = &resource.address,
"Missing current_object for object_address: {}. You probably should backfill db.",
resource.address,
);
return Ok(None);
return Ok(None);
},
}
} else {
tracing::error!(

Check warning on line 196 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L196

Added line #L196 was not covered by tests
transaction_version = txn_version,
lookup_key = &resource.address,
"Connection to DB is missing. You may need to investigate",

Check warning on line 199 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L198-L199

Added lines #L198 - L199 were not covered by tests
);
return Ok(None);

Check warning on line 201 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L201

Added line #L201 was not covered by tests
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_deleted: true,
untransferrable: previous_object.untransferrable,
block_timestamp,
},
}
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
RawCurrentObject {
object_address: resource.address,
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash,
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
)))
RawCurrentObject {
object_address: resource.address,
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash,
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: previous_object.untransferrable,
block_timestamp,
},
)))
}
} else {
Ok(None)
}
Expand Down Expand Up @@ -210,6 +253,7 @@ impl RawObject {
last_transaction_version: res.last_transaction_version,
is_deleted: res.is_deleted,
untransferrable: res.untransferrable,
block_timestamp: chrono::NaiveDateTime::default(), // this won't be used

Check warning on line 256 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L256

Added line #L256 was not covered by tests
});
},
Err(_) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
},
schema::current_token_ownerships_v2,
utils::{
database::DbPoolConnection,
database::{DbContext, DbPoolConnection},
util::{ensure_not_negative, standardize_address},
},
};
Expand Down Expand Up @@ -259,9 +259,7 @@ impl RawTokenOwnershipV2 {
prior_nft_ownership: &AHashMap<String, NFTOwnershipV2>,
tokens_burned: &TokenV2Burned,
object_metadatas: &ObjectAggregatedDataMapping,
conn: &mut Option<DbPoolConnection<'_>>,
query_retries: u32,
query_retry_delay_ms: u64,
db_context: &mut Option<DbContext<'_>>,
) -> anyhow::Result<Option<(Self, RawCurrentTokenOwnershipV2)>> {
let token_data_id = standardize_address(&write_resource.address.to_string());
if tokens_burned
Expand Down Expand Up @@ -328,9 +326,7 @@ impl RawTokenOwnershipV2 {
txn_timestamp,
prior_nft_ownership,
tokens_burned,
conn,
query_retries,
query_retry_delay_ms,
db_context,

Check warning on line 329 in rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs#L329

Added line #L329 was not covered by tests
)
.await;
}
Expand All @@ -346,9 +342,7 @@ impl RawTokenOwnershipV2 {
txn_timestamp: chrono::NaiveDateTime,
prior_nft_ownership: &AHashMap<String, NFTOwnershipV2>,
tokens_burned: &TokenV2Burned,
conn: &mut Option<DbPoolConnection<'_>>,
query_retries: u32,
query_retry_delay_ms: u64,
db_context: &mut Option<DbContext<'_>>,
) -> anyhow::Result<Option<(Self, RawCurrentTokenOwnershipV2)>> {
let token_address = standardize_address(&delete_resource.address.to_string());
Self::get_burned_nft_v2_helper(
Expand All @@ -358,9 +352,7 @@ impl RawTokenOwnershipV2 {
txn_timestamp,
prior_nft_ownership,
tokens_burned,
conn,
query_retries,
query_retry_delay_ms,
db_context,
)
.await
}
Expand All @@ -372,9 +364,7 @@ impl RawTokenOwnershipV2 {
txn_timestamp: chrono::NaiveDateTime,
prior_nft_ownership: &AHashMap<String, NFTOwnershipV2>,
tokens_burned: &TokenV2Burned,
conn: &mut Option<DbPoolConnection<'_>>,
query_retries: u32,
query_retry_delay_ms: u64,
db_context: &mut Option<DbContext<'_>>,
) -> anyhow::Result<Option<(Self, RawCurrentTokenOwnershipV2)>> {
let token_address = standardize_address(token_address);
if let Some(burn_event) = tokens_burned.get(&token_address) {
Expand All @@ -389,7 +379,7 @@ impl RawTokenOwnershipV2 {
match prior_nft_ownership.get(&token_address) {
Some(inner) => inner.owner_address.clone(),
None => {
match conn {
match db_context {

Check warning on line 382 in rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs#L382

Added line #L382 was not covered by tests
None => {
// TODO: update message
tracing::error!(
Expand All @@ -399,12 +389,12 @@ impl RawTokenOwnershipV2 {
);
DEFAULT_OWNER_ADDRESS.to_string()
},
Some(ref mut conn) => {
Some(db_context) => {

Check warning on line 392 in rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs#L392

Added line #L392 was not covered by tests
match CurrentTokenOwnershipV2Query::get_latest_owned_nft_by_token_data_id(
conn,
&mut db_context.conn,

Check warning on line 394 in rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs#L394

Added line #L394 was not covered by tests
&token_address,
query_retries,
query_retry_delay_ms,
db_context.query_retries,
db_context.query_retry_delay_ms,

Check warning on line 397 in rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs#L396-L397

Added lines #L396 - L397 were not covered by tests
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod ans_models;
pub mod default_models;
pub mod event_models;
pub mod fungible_asset_models;
pub mod object_models;
pub mod token_v2_models;
pub mod transaction_metadata_model;
pub mod user_transaction_models;
Expand Down
Loading

0 comments on commit a0c8208

Please sign in to comment.