Skip to content

Commit

Permalink
[parquet-sdk]migrate objects [pt2]
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Dec 16, 2024
1 parent 97eccf2 commit 5dfd840
Show file tree
Hide file tree
Showing 13 changed files with 622 additions and 299 deletions.
132 changes: 87 additions & 45 deletions rust/processor/src/db/common/models/object_models/raw_v2_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

const DELETED_RESOURCE_OWNER_ADDRESS: &str = "Unknown";

#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]
pub struct RawObject {
pub transaction_version: i64,
Expand All @@ -30,6 +32,7 @@ pub struct RawObject {
pub allow_ungated_transfer: bool,
pub is_deleted: bool,
pub untransferrable: bool,
pub block_timestamp: chrono::NaiveDateTime,
}

pub trait ObjectConvertible {
Expand All @@ -46,6 +49,7 @@ pub struct RawCurrentObject {
pub last_transaction_version: i64,
pub is_deleted: bool,
pub untransferrable: bool,
pub block_timestamp: chrono::NaiveDateTime,
}

pub trait CurrentObjectConvertible {
Expand Down Expand Up @@ -73,14 +77,10 @@ impl RawObject {
txn_version: i64,
write_set_change_index: i64,
object_metadata_mapping: &ObjectAggregatedDataMapping,
block_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
let address = standardize_address(&write_resource.address.to_string());
println!("address: {:?}", address);
if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) {
println!(
"object_aggregated_metadata: {:?}",
object_aggregated_metadata
);
// do something
let object_with_metadata = object_aggregated_metadata.object.clone();
let object_core = object_with_metadata.object_core;
Expand All @@ -101,6 +101,7 @@ impl RawObject {
allow_ungated_transfer: object_core.allow_ungated_transfer,
is_deleted: false,
untransferrable,
block_timestamp,
},
RawCurrentObject {
object_address: address,
Expand All @@ -111,6 +112,7 @@ impl RawObject {
last_transaction_version: txn_version,
is_deleted: false,
untransferrable,
block_timestamp,
},
)))
} else {
Expand All @@ -126,7 +128,7 @@ impl RawObject {
txn_version: i64,
write_set_change_index: i64,
object_mapping: &AHashMap<CurrentObjectPK, RawCurrentObject>,
conn: &mut DbPoolConnection<'_>,
conn: &mut Option<DbPoolConnection<'_>>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(Self, RawCurrentObject)>> {
Expand All @@ -137,52 +139,91 @@ impl RawObject {
txn_version,
0, // Placeholder, this isn't used anyway
);
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
// Add a logid here to handle None conn
if conn.is_none() {
// This is a hack to prevent the program for parquet
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: BigDecimal::default(),
allow_ungated_transfer: false,
is_deleted: true,
untransferrable: false,
block_timestamp: chrono::NaiveDateTime::default(),
},
RawCurrentObject {
object_address: resource.address.clone(),
owner_address: DELETED_RESOURCE_OWNER_ADDRESS.to_string(),
state_key_hash: resource.state_key_hash.clone(),
last_guid_creation_num: BigDecimal::default(),
allow_ungated_transfer: false,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: false,
block_timestamp: chrono::NaiveDateTime::default(),
},
)))

Check warning on line 169 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L145-L169

Added lines #L145 - L169 were not covered by tests
} else {
match Self::get_current_object(
conn,
&resource.address,
query_retries,
query_retry_delay_ms,
)
.await
{
Ok(object) => object,
Err(_) => {
tracing::error!(
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
} else if let Some(ref mut conn) = conn {
match Self::get_current_object(
conn,
&resource.address,
query_retries,
query_retry_delay_ms,
)
.await
{
Ok(object) => object,

Check warning on line 182 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L182

Added line #L182 was not covered by tests
Err(_) => {
tracing::error!(
transaction_version = txn_version,
lookup_key = &resource.address,
"Missing current_object for object_address: {}. You probably should backfill db.",
resource.address,
);
return Ok(None);
return Ok(None);
},
}
} else {
tracing::error!(

Check warning on line 194 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L194

Added line #L194 was not covered by tests
transaction_version = txn_version,
lookup_key = &resource.address,
"Connection to DB is missing. You may need to investigate",

Check warning on line 197 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L196-L197

Added lines #L196 - L197 were not covered by tests
);
return Ok(None);

Check warning on line 199 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L199

Added line #L199 was not covered by tests
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_deleted: true,
untransferrable: previous_object.untransferrable,
block_timestamp: previous_object.block_timestamp,
},
}
};
Ok(Some((
Self {
transaction_version: txn_version,
write_set_change_index,
object_address: resource.address.clone(),
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash.clone(),
guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
RawCurrentObject {
object_address: resource.address,
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash,
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: previous_object.untransferrable,
},
)))
RawCurrentObject {
object_address: resource.address,
owner_address: previous_object.owner_address.clone(),
state_key_hash: resource.state_key_hash,
last_guid_creation_num: previous_object.last_guid_creation_num.clone(),
allow_ungated_transfer: previous_object.allow_ungated_transfer,
last_transaction_version: txn_version,
is_deleted: true,
untransferrable: previous_object.untransferrable,
block_timestamp: previous_object.block_timestamp,
},
)))
}
} else {
Ok(None)
}
Expand Down Expand Up @@ -210,6 +251,7 @@ impl RawObject {
last_transaction_version: res.last_transaction_version,
is_deleted: res.is_deleted,
untransferrable: res.untransferrable,
block_timestamp: chrono::NaiveDateTime::default(), // this won't be used

Check warning on line 254 in rust/processor/src/db/common/models/object_models/raw_v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/object_models/raw_v2_objects.rs#L254

Added line #L254 was not covered by tests
});
},
Err(_) => {
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod ans_models;
pub mod default_models;
pub mod event_models;
pub mod fungible_asset_models;
pub mod object_models;
pub mod token_v2_models;
pub mod transaction_metadata_model;
pub mod user_transaction_models;
Expand Down
103 changes: 83 additions & 20 deletions rust/processor/src/db/parquet/models/object_models/v2_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,112 @@
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping};
use crate::{
db::postgres::models::default_models::move_resources::MoveResource,
schema::{current_objects, objects},
utils::{database::DbPoolConnection, util::standardize_address},
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::object_models::raw_v2_objects::{
CurrentObjectConvertible, ObjectConvertible, RawCurrentObject, RawObject,
},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{DeleteResource, WriteResource};
use bigdecimal::BigDecimal;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use allocative_derive::Allocative;
use bigdecimal::ToPrimitive;
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
pub struct RawObject {
pub transaction_version: i64,
#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,

Check warning on line 21 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L21

Added line #L21 was not covered by tests
)]
pub struct Object {

Check warning on line 23 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L23

Added line #L23 was not covered by tests
pub txn_version: i64,
pub write_set_change_index: i64,
pub object_address: String,
pub owner_address: String,
pub state_key_hash: String,
pub guid_creation_num: BigDecimal,
pub guid_creation_num: u64, // BigDecimal,
pub allow_ungated_transfer: bool,
pub is_deleted: bool,
pub untransferrable: bool,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

pub trait ObjectConvertible {
fn from_raw(raw_item: RawObject) -> Self;
impl NamedTable for Object {
const TABLE_NAME: &'static str = "objects";
}

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
pub struct RawCurrentObject {
impl HasVersion for Object {
fn version(&self) -> i64 {
self.txn_version
}

Check warning on line 44 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L42-L44

Added lines #L42 - L44 were not covered by tests
}

impl GetTimeStamp for Object {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}

Check warning on line 50 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L48-L50

Added lines #L48 - L50 were not covered by tests
}

impl ObjectConvertible for Object {
fn from_raw(raw_item: RawObject) -> Self {
Self {
txn_version: raw_item.transaction_version,
write_set_change_index: raw_item.write_set_change_index,
object_address: raw_item.object_address,
owner_address: raw_item.owner_address,
state_key_hash: raw_item.state_key_hash,
guid_creation_num: raw_item.guid_creation_num.to_u64().unwrap(),
allow_ungated_transfer: raw_item.allow_ungated_transfer,
is_deleted: raw_item.is_deleted,
untransferrable: raw_item.untransferrable,
block_timestamp: raw_item.block_timestamp,
}
}

Check warning on line 67 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L54-L67

Added lines #L54 - L67 were not covered by tests
}

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,

Check warning on line 71 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L71

Added line #L71 was not covered by tests
)]
pub struct CurrentObject {

Check warning on line 73 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L73

Added line #L73 was not covered by tests
pub object_address: String,
pub owner_address: String,
pub state_key_hash: String,
pub allow_ungated_transfer: bool,
pub last_guid_creation_num: BigDecimal,
pub last_guid_creation_num: u64, // BigDecimal,
pub last_transaction_version: i64,
pub is_deleted: bool,
pub untransferrable: bool,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for CurrentObject {
const TABLE_NAME: &'static str = "objects";
}

impl HasVersion for CurrentObject {
fn version(&self) -> i64 {
self.last_transaction_version
}

Check warning on line 93 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L91-L93

Added lines #L91 - L93 were not covered by tests
}

impl GetTimeStamp for CurrentObject {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}

Check warning on line 99 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L97-L99

Added lines #L97 - L99 were not covered by tests
}

pub trait CurrentObjectConvertible {
fn from_raw(raw_item: RawCurrentObject) -> Self;
impl CurrentObjectConvertible for CurrentObject {
fn from_raw(raw_item: RawCurrentObject) -> Self {
Self {
object_address: raw_item.object_address,
owner_address: raw_item.owner_address,
state_key_hash: raw_item.state_key_hash,
allow_ungated_transfer: raw_item.allow_ungated_transfer,
last_guid_creation_num: raw_item.last_guid_creation_num.to_u64().unwrap(),
last_transaction_version: raw_item.last_transaction_version,
is_deleted: raw_item.is_deleted,
untransferrable: raw_item.untransferrable,
block_timestamp: raw_item.block_timestamp,
}
}

Check warning on line 115 in rust/processor/src/db/parquet/models/object_models/v2_objects.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/object_models/v2_objects.rs#L103-L115

Added lines #L103 - L115 were not covered by tests
}
Loading

0 comments on commit 5dfd840

Please sign in to comment.