Skip to content

Commit

Permalink
[parquet-sdk]migrate objects [pt2] (#641)
Browse files Browse the repository at this point in the history
## Description
- migrate objects 
- for parquet, we want to avoid lookups for deleted resource. so we replaced with default values, which will be handled during the load as a part of qc pipeline.
```
                 Self {
                        transaction_version: txn_version,
                        write_set_change_index,
                        object_address: resource.address.clone(),
                        owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(),   <-------
                        state_key_hash: resource.state_key_hash.clone(),
                        guid_creation_num: BigDecimal::default(),       <-------
                        allow_ungated_transfer: false,     <-------
                        is_deleted: true,
                        untransferrable: false,          <-------
                        block_timestamp: chrono::NaiveDateTime::default(),      <-------
```
these fields are the ones from the prev owner

## Test Plan
- Compared the number of rows from DE teams BigQuery table, testing BigQuery table, legacy processor table, and sdk processor table 

### objects
![Screenshot 2024-12-11 at 9 57 15 PM](https://github.com/user-attachments/assets/75943a99-2b80-4916-b54b-486d6d9b7f0f)
![Screenshot 2024-12-11 at 10 02 36 PM](https://github.com/user-attachments/assets/ee73187c-a99d-4560-b460-0ab0b0efa165)

### current objects
![Screenshot 2024-12-11 at 9 57 26 PM](https://github.com/user-attachments/assets/7aac4d67-4c19-400d-8254-bc17b619ea27)
![Screenshot 2024-12-11 at 10 05 24 PM](https://github.com/user-attachments/assets/f9e2184f-cffc-4e89-99d6-c21371c48d79)
  • Loading branch information
yuunlimm authored Dec 18, 2024
1 parent fdfb3a3 commit 0cad206
Show file tree
Hide file tree
Showing 19 changed files with 669 additions and 360 deletions.
140 changes: 92 additions & 48 deletions rust/processor/src/db/common/models/object_models/raw_v2_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping};
use crate::{
db::postgres::models::default_models::move_resources::MoveResource,
schema::current_objects,
utils::{database::DbPoolConnection, util::standardize_address},
utils::{
database::{DbContext, DbPoolConnection},
util::standardize_address,
},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{DeleteResource, WriteResource};
Expand All @@ -19,6 +22,8 @@ use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

const DELETED_RESOURCE_OWNER_ADDRESS: &str = "Unknown";

#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]
pub struct RawObject {
pub transaction_version: i64,
Expand All @@ -30,6 +35,7 @@ pub struct RawObject {
pub allow_ungated_transfer: bool,
pub is_deleted: bool,
pub untransferrable: bool,
pub block_timestamp: chrono::NaiveDateTime,
}

pub trait ObjectConvertible {
Expand All @@ -46,6 +52,7 @@ pub struct RawCurrentObject {
pub last_transaction_version: i64,
pub is_deleted: bool,
pub untransferrable: bool,
pub block_timestamp: chrono::NaiveDateTime,
}

pub trait CurrentObjectConvertible {
Expand Down Expand Up @@ -73,14 +80,10 @@ impl RawObject {
txn_version: i64,
write_set_change_index: i64,
object_metadata_mapping: &ObjectAggregatedDataMapping,
block_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
let address = standardize_address(&write_resource.address.to_string());
println!("address: {:?}", address);
if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) {
println!(
"object_aggregated_metadata: {:?}",
object_aggregated_metadata
);
// do something
let object_with_metadata = object_aggregated_metadata.object.clone();
let object_core = object_with_metadata.object_core;
Expand All @@ -101,6 +104,7 @@ impl RawObject {
allow_ungated_transfer: object_core.allow_ungated_transfer,
is_deleted: false,
untransferrable,
block_timestamp,
},
RawCurrentObject {
object_address: address,
Expand All @@ -111,6 +115,7 @@ impl RawObject {
last_transaction_version: txn_version,
is_deleted: false,
untransferrable,
block_timestamp,
},
)))
} else {
Expand All @@ -126,9 +131,8 @@ impl RawObject {
txn_version: i64,
write_set_change_index: i64,
object_mapping: &AHashMap<CurrentObjectPK, RawCurrentObject>,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
db_context: &mut Option<DbContext<'_>>,
block_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
if delete_resource.type_str == "0x1::object::ObjectGroup" {
let resource = MoveResource::from_delete_resource(
Expand All @@ -137,52 +141,91 @@ impl RawObject {
txn_version,
0, // Placeholder, this isn't used anyway
);
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
// Add a logid here to handle None conn
if db_context.is_none() {
// This is a hack to prevent the program for parquet
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: BigDecimal::default(),
allow_ungated_transfer: false,
is_deleted: true,
untransferrable: false,
block_timestamp,
},
RawCurrentObject {
object_address: resource.address.clone(),
owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(),
state_key_hash: resource.state_key_hash.clone(),
last_guid_creation_num: BigDecimal::default(),
allow_ungated_transfer: false,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: false,
block_timestamp,
},
)))
} else {
match Self::get_current_object(
conn,
&resource.address,
query_retries,
query_retry_delay_ms,
)
.await
{
Ok(object) => object,
Err(_) => {
tracing::error!(
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
} else if let Some(db_context) = db_context {
match Self::get_current_object(
&mut db_context.conn,
&resource.address,
db_context.query_retries,
db_context.query_retry_delay_ms,
)
.await
{
Ok(object) => object,
Err(_) => {
tracing::error!(
transaction_version = txn_version,
lookup_key = &resource.address,
"Missing current_object for object_address: {}. You probably should backfill db.",
resource.address,
);
return Ok(None);
return Ok(None);
},
}
} else {
tracing::error!(
transaction_version = txn_version,
lookup_key = &resource.address,
"Connection to DB is missing. You may need to investigate",
);
return Ok(None);
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_deleted: true,
untransferrable: previous_object.untransferrable,
block_timestamp,
},
}
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
RawCurrentObject {
object_address: resource.address,
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash,
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
)))
RawCurrentObject {
object_address: resource.address,
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash,
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: previous_object.untransferrable,
block_timestamp,
},
)))
}
} else {
Ok(None)
}
Expand Down Expand Up @@ -210,6 +253,7 @@ impl RawObject {
last_transaction_version: res.last_transaction_version,
is_deleted: res.is_deleted,
untransferrable: res.untransferrable,
block_timestamp: chrono::NaiveDateTime::default(), // this won't be used
});
},
Err(_) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
},
schema::current_token_ownerships_v2,
utils::{
database::DbPoolConnection,
database::{DbContext, DbPoolConnection},
util::{ensure_not_negative, standardize_address},
},
};
Expand Down Expand Up @@ -259,9 +259,7 @@ impl RawTokenOwnershipV2 {
prior_nft_ownership: &AHashMap<String, NFTOwnershipV2>,
tokens_burned: &TokenV2Burned,
object_metadatas: &ObjectAggregatedDataMapping,
conn: &mut Option<DbPoolConnection<'_>>,
query_retries: u32,
query_retry_delay_ms: u64,
db_context: &mut Option<DbContext<'_>>,
) -> anyhow::Result<Option<(Self, RawCurrentTokenOwnershipV2)>> {
let token_data_id = standardize_address(&write_resource.address.to_string());
if tokens_burned
Expand Down Expand Up @@ -328,9 +326,7 @@ impl RawTokenOwnershipV2 {
txn_timestamp,
prior_nft_ownership,
tokens_burned,
conn,
query_retries,
query_retry_delay_ms,
db_context,
)
.await;
}
Expand All @@ -346,9 +342,7 @@ impl RawTokenOwnershipV2 {
txn_timestamp: chrono::NaiveDateTime,
prior_nft_ownership: &AHashMap<String, NFTOwnershipV2>,
tokens_burned: &TokenV2Burned,
conn: &mut Option<DbPoolConnection<'_>>,
query_retries: u32,
query_retry_delay_ms: u64,
db_context: &mut Option<DbContext<'_>>,
) -> anyhow::Result<Option<(Self, RawCurrentTokenOwnershipV2)>> {
let token_address = standardize_address(&delete_resource.address.to_string());
Self::get_burned_nft_v2_helper(
Expand All @@ -358,9 +352,7 @@ impl RawTokenOwnershipV2 {
txn_timestamp,
prior_nft_ownership,
tokens_burned,
conn,
query_retries,
query_retry_delay_ms,
db_context,
)
.await
}
Expand All @@ -372,9 +364,7 @@ impl RawTokenOwnershipV2 {
txn_timestamp: chrono::NaiveDateTime,
prior_nft_ownership: &AHashMap<String, NFTOwnershipV2>,
tokens_burned: &TokenV2Burned,
conn: &mut Option<DbPoolConnection<'_>>,
query_retries: u32,
query_retry_delay_ms: u64,
db_context: &mut Option<DbContext<'_>>,
) -> anyhow::Result<Option<(Self, RawCurrentTokenOwnershipV2)>> {
let token_address = standardize_address(token_address);
if let Some(burn_event) = tokens_burned.get(&token_address) {
Expand All @@ -389,7 +379,7 @@ impl RawTokenOwnershipV2 {
match prior_nft_ownership.get(&token_address) {
Some(inner) => inner.owner_address.clone(),
None => {
match conn {
match db_context {
None => {
// TODO: update message
tracing::error!(
Expand All @@ -399,12 +389,12 @@ impl RawTokenOwnershipV2 {
);
DEFAULT_OWNER_ADDRESS.to_string()
},
Some(ref mut conn) => {
Some(db_context) => {
match CurrentTokenOwnershipV2Query::get_latest_owned_nft_by_token_data_id(
conn,
&mut db_context.conn,
&token_address,
query_retries,
query_retry_delay_ms,
db_context.query_retries,
db_context.query_retry_delay_ms,
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod ans_models;
pub mod default_models;
pub mod event_models;
pub mod fungible_asset_models;
pub mod object_models;
pub mod token_v2_models;
pub mod transaction_metadata_model;
pub mod user_transaction_models;
Expand Down
Loading

0 comments on commit 0cad206

Please sign in to comment.