Skip to content

Commit

Permalink
[parquet-sdk]migrate objects [pt2]
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Dec 12, 2024
1 parent 055afb7 commit 50c3256
Show file tree
Hide file tree
Showing 13 changed files with 623 additions and 299 deletions.
130 changes: 86 additions & 44 deletions rust/processor/src/db/common/models/object_models/v2_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

const DELETED_RESOURCE_OWNER_ADDRESS: &str = "Unknown";

#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]
pub struct RawObject {
pub transaction_version: i64,
Expand All @@ -30,6 +32,7 @@ pub struct RawObject {
pub allow_ungated_transfer: bool,
pub is_deleted: bool,
pub untransferrable: bool,
pub block_timestamp: chrono::NaiveDateTime,
}

pub trait ObjectConvertible {
Expand All @@ -46,6 +49,7 @@ pub struct RawCurrentObject {
pub last_transaction_version: i64,
pub is_deleted: bool,
pub untransferrable: bool,
pub block_timestamp: chrono::NaiveDateTime,
}

pub trait CurrentObjectConvertible {
Expand Down Expand Up @@ -73,14 +77,10 @@ impl RawObject {
txn_version: i64,
write_set_change_index: i64,
object_metadata_mapping: &ObjectAggregatedDataMapping,
block_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
let address = standardize_address(&write_resource.address.to_string());
println!("address: {:?}", address);
if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) {
println!(
"object_aggregated_metadata: {:?}",
object_aggregated_metadata
);
// do something
let object_with_metadata = object_aggregated_metadata.object.clone();
let object_core = object_with_metadata.object_core;
Expand All @@ -101,6 +101,7 @@ impl RawObject {
allow_ungated_transfer: object_core.allow_ungated_transfer,
is_deleted: false,
untransferrable,
block_timestamp,
},
RawCurrentObject {
object_address: address,
Expand All @@ -111,6 +112,7 @@ impl RawObject {
last_transaction_version: txn_version,
is_deleted: false,
untransferrable,
block_timestamp,
},
)))
} else {
Expand All @@ -126,7 +128,7 @@ impl RawObject {
txn_version: i64,
write_set_change_index: i64,
object_mapping: &AHashMap<CurrentObjectPK, RawCurrentObject>,
conn: &mut DbPoolConnection<'_>,
conn: &mut Option<DbPoolConnection<'_>>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
Expand All @@ -137,52 +139,91 @@ impl RawObject {
txn_version,
0, // Placeholder, this isn't used anyway
);
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
// Add a logid here to handle None conn
if conn.is_none() {
// This is a hack to prevent the program for parquet
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: BigDecimal::default(),
allow_ungated_transfer: false,
is_deleted: true,
untransferrable: false,
block_timestamp: chrono::NaiveDateTime::default(),
},
RawCurrentObject {
object_address: resource.address.clone(),
owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(),
state_key_hash: resource.state_key_hash.clone(),
last_guid_creation_num: BigDecimal::default(),
allow_ungated_transfer: false,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: false,
block_timestamp: chrono::NaiveDateTime::default(),
},
)))

Check warning on line 169 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L145-L169

Added lines #L145 - L169 were not covered by tests
} else {
match Self::get_current_object(
conn,
&resource.address,
query_retries,
query_retry_delay_ms,
)
.await
{
Ok(object) => object,
Err(_) => {
tracing::error!(
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
} else if let Some(ref mut conn) = conn {
match Self::get_current_object(
conn,
&resource.address,
query_retries,
query_retry_delay_ms,
)
.await
{
Ok(object) => object,

Check warning on line 182 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L182

Added line #L182 was not covered by tests
Err(_) => {
tracing::error!(
transaction_version = txn_version,
lookup_key = &resource.address,
"Missing current_object for object_address: {}. You probably should backfill db.",
resource.address,
);
return Ok(None);
},
}
} else {
tracing::error!(

Check warning on line 194 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L194

Added line #L194 was not covered by tests
transaction_version = txn_version,
lookup_key = &resource.address,
"Connection to DB is missing. You may need to investigate",

Check warning on line 197 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L196-L197

Added lines #L196 - L197 were not covered by tests
);
return Ok(None);
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_deleted: true,
untransferrable: previous_object.untransferrable,
block_timestamp: previous_object.block_timestamp,
},
}
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
RawCurrentObject {
object_address: resource.address,
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash,
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
)))
RawCurrentObject {
object_address: resource.address,
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash,
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: previous_object.untransferrable,
block_timestamp: previous_object.block_timestamp,
},
)))
}
} else {
Ok(None)
}
Expand Down Expand Up @@ -210,6 +251,7 @@ impl RawObject {
last_transaction_version: res.last_transaction_version,
is_deleted: res.is_deleted,
untransferrable: res.untransferrable,
block_timestamp: chrono::NaiveDateTime::default(), // this won't be used

Check warning on line 254 in rust/processor/src/db/common/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/v2_objects.rs#L254

Added line #L254 was not covered by tests
});
},
Err(_) => {
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub mod fungible_asset_models;
pub mod token_v2_models;
pub mod transaction_metadata_model;
pub mod user_transaction_models;
pub mod object_models;
110 changes: 88 additions & 22 deletions rust/processor/src/db/parquet/models/object_models/v2_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,115 @@
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping};
use crate::{
db::postgres::models::default_models::move_resources::MoveResource,
schema::{current_objects, objects},
utils::{database::DbPoolConnection, util::standardize_address},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{DeleteResource, WriteResource};
use bigdecimal::BigDecimal;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use bigdecimal::{ToPrimitive};
use field_count::FieldCount;
use serde::{Deserialize, Serialize};
use allocative_derive::Allocative;
use parquet_derive::ParquetRecordWriter;
use crate::bq_analytics::generic_parquet_processor::NamedTable;
use crate::bq_analytics::generic_parquet_processor::HasVersion;
use crate::bq_analytics::generic_parquet_processor::GetTimeStamp;
use crate::db::common::models::object_models::v2_objects::ObjectConvertible;
use crate::db::common::models::object_models::v2_objects::CurrentObjectConvertible;
use crate::db::common::models::object_models::v2_objects::RawObject;
use crate::db::common::models::object_models::v2_objects::RawCurrentObject;

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
pub struct RawObject {
pub transaction_version: i64,
#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,

Check warning on line 22 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L22

Added line #L22 was not covered by tests
)]
pub struct Object {

Check warning on line 24 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L24

Added line #L24 was not covered by tests
pub txn_version: i64,
pub write_set_change_index: i64,
pub object_address: String,
pub owner_address: String,
pub state_key_hash: String,
pub guid_creation_num: BigDecimal,
pub guid_creation_num: u64, // BigDecimal,
pub allow_ungated_transfer: bool,
pub is_deleted: bool,
pub untransferrable: bool,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

pub trait ObjectConvertible {
fn from_raw(raw_item: RawObject) -> Self;
impl NamedTable for Object {
const TABLE_NAME: &'static str = "objects";
}

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
pub struct RawCurrentObject {
impl HasVersion for Object {
fn version(&self) -> i64 {
self.txn_version
}

Check warning on line 45 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L43-L45

Added lines #L43 - L45 were not covered by tests
}

impl GetTimeStamp for Object {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}

Check warning on line 51 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L49-L51

Added lines #L49 - L51 were not covered by tests
}

impl ObjectConvertible for Object {
fn from_raw(raw_item: RawObject) -> Self {
Self {
txn_version: raw_item.transaction_version,
write_set_change_index: raw_item.write_set_change_index,
object_address: raw_item.object_address,
owner_address: raw_item.owner_address,
state_key_hash: raw_item.state_key_hash,
guid_creation_num: raw_item.guid_creation_num.to_u64().unwrap(),
allow_ungated_transfer: raw_item.allow_ungated_transfer,
is_deleted: raw_item.is_deleted,
untransferrable: raw_item.untransferrable,
block_timestamp: raw_item.block_timestamp,
}
}

Check warning on line 68 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L55-L68

Added lines #L55 - L68 were not covered by tests
}

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,

Check warning on line 72 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L72

Added line #L72 was not covered by tests
)]
pub struct CurrentObject {

Check warning on line 74 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L74

Added line #L74 was not covered by tests
pub object_address: String,
pub owner_address: String,
pub state_key_hash: String,
pub allow_ungated_transfer: bool,
pub last_guid_creation_num: BigDecimal,
pub last_guid_creation_num: u64, // BigDecimal,
pub last_transaction_version: i64,
pub is_deleted: bool,
pub untransferrable: bool,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}


impl NamedTable for CurrentObject {
const TABLE_NAME: &'static str = "objects";
}

impl HasVersion for CurrentObject {
fn version(&self) -> i64 {
self.last_transaction_version
}

Check warning on line 95 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L93-L95

Added lines #L93 - L95 were not covered by tests
}

pub trait CurrentObjectConvertible {
fn from_raw(raw_item: RawCurrentObject) -> Self;
impl GetTimeStamp for CurrentObject {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}

Check warning on line 101 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L99-L101

Added lines #L99 - L101 were not covered by tests
}


impl CurrentObjectConvertible for CurrentObject {
fn from_raw(raw_item: RawCurrentObject) -> Self {
Self {
object_address: raw_item.object_address,
owner_address: raw_item.owner_address,
state_key_hash: raw_item.state_key_hash,
allow_ungated_transfer: raw_item.allow_ungated_transfer,
last_guid_creation_num: raw_item.last_guid_creation_num.to_u64().unwrap(),
last_transaction_version: raw_item.last_transaction_version,
is_deleted: raw_item.is_deleted,
untransferrable: raw_item.untransferrable,
block_timestamp: raw_item.block_timestamp,
}
}

Check warning on line 118 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L106-L118

Added lines #L106 - L118 were not covered by tests
}
Loading

0 comments on commit 50c3256

Please sign in to comment.