Skip to content

Commit

Permalink
separate parquet and postgres model files
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 18, 2024
1 parent 7a8aad3 commit 61ecc68
Show file tree
Hide file tree
Showing 129 changed files with 278 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Chan
use diesel::{Identifiable, Insertable, Queryable};
use field_count::FieldCount;
use processor::{
db::common::models::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
Expand Down
14 changes: 1 addition & 13 deletions rust/processor/src/db/common/models/default_models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod block_metadata_transactions;
pub mod move_resources;
pub mod move_tables;

// parquet models
pub mod parquet_move_modules;
pub mod parquet_move_resources;
pub mod parquet_move_tables;
pub mod parquet_transactions;
pub mod parquet_write_set_changes;
pub mod raw_table_items;
Original file line number Diff line number Diff line change
@@ -1,55 +1,8 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

use crate::{
schema::{current_table_items, table_items, table_metadatas},
db::postgres::models::default_models::move_tables::{CurrentTableItem, TableItem},
utils::util::{hash_str, standardize_address},
};
use aptos_protos::transaction::v1::{DeleteTableItem, WriteTableItem};
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(table_handle, key_hash))]
#[diesel(table_name = current_table_items)]
pub struct CurrentTableItem {
pub table_handle: String,
pub key_hash: String,
pub key: String,
pub decoded_key: serde_json::Value,
pub decoded_value: Option<serde_json::Value>,
pub last_transaction_version: i64,
pub is_deleted: bool,
}

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, write_set_change_index))]
#[diesel(table_name = table_items)]
pub struct TableItem {
pub transaction_version: i64,
pub write_set_change_index: i64,
pub transaction_block_height: i64,
pub key: String,
pub table_handle: String,
pub decoded_key: serde_json::Value,
pub decoded_value: Option<serde_json::Value>,
pub is_deleted: bool,
}

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(handle))]
#[diesel(table_name = table_metadatas)]
pub struct TableMetadata {
pub handle: String,
pub key_type: String,
pub value_type: String,
}

pub trait TableItemConvertible {
fn from_raw(raw_item: &RawTableItem) -> Self;
}

/// RawTableItem is a struct that will be used to converted into Postgres or Parquet TableItem
pub struct RawTableItem {
Expand Down Expand Up @@ -135,56 +88,25 @@ impl RawTableItem {
},
)
}
}

impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
transaction_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: serde_json::from_str(raw_item.decoded_key.as_str()).unwrap(),
decoded_value: raw_item
.decoded_value
.clone()
.map(|v| serde_json::from_str(v.as_str()).unwrap()),
is_deleted: raw_item.is_deleted,
}
}
}

impl TableItem {
pub fn from_write_table_item(
pub fn postgres_table_item_from_write_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
transaction_version: i64,
txn_version: i64,
transaction_block_height: i64,
) -> Self {
Self {
transaction_version,
block_timestamp: chrono::NaiveDateTime,
) -> TableItem {
let (raw_table_item, _current_table_item) = RawTableItem::from_write_table_item(
write_table_item,
write_set_change_index,
txn_version,
transaction_block_height,
key: write_table_item.key.to_string(),
table_handle: standardize_address(&write_table_item.handle.to_string()),
decoded_key: serde_json::from_str(write_table_item.data.as_ref().unwrap().key.as_str())
.unwrap(),
decoded_value: serde_json::from_str(
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
is_deleted: false,
}
block_timestamp,
);
TableItem::from_raw(&raw_table_item)
}
}

impl TableMetadata {
pub fn from_write_table_item(table_item: &WriteTableItem) -> Self {
Self {
handle: table_item.handle.to_string(),
key_type: table_item.data.as_ref().unwrap().key_type.clone(),
value_type: table_item.data.as_ref().unwrap().value_type.clone(),
}
}
pub trait TableItemConvertible {
fn from_raw(raw_item: &RawTableItem) -> Self;
}
18 changes: 0 additions & 18 deletions rust/processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod account_transaction_models;
pub mod ans_models;
pub mod coin_models;
pub mod default_models;
pub mod events_models;
pub mod fungible_asset_models;
pub mod ledger_info;
pub mod object_models;
pub mod processor_status;
pub mod property_map;
pub mod resources;
pub mod stake_models;
pub mod token_models;
pub mod token_v2_models;
pub mod transaction_metadata_model;
pub mod user_transactions_models;
2 changes: 2 additions & 0 deletions rust/processor/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub mod common;
pub mod parquet;
pub mod postgres;
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod models;
6 changes: 6 additions & 0 deletions rust/processor/src/db/parquet/models/default_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// parquet models
pub mod parquet_move_modules;
pub mod parquet_move_resources;
pub mod parquet_move_tables;
pub mod parquet_transactions;
pub mod parquet_write_set_changes;
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::default_models::move_tables::{RawTableItem, TableItemConvertible},
db::common::models::default_models::raw_table_items::{RawTableItem, TableItemConvertible},
utils::util::{hash_str, standardize_address},
};
use allocative_derive::Allocative;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use super::{
block_metadata_transactions::BlockMetadataTransactionModel,
parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel},
};
use super::parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel};
use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::postgres::models::default_models::block_metadata_transactions::BlockMetadataTransactionModel,
utils::{
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
util::{get_clean_payload, get_clean_writeset, get_payload_type, standardize_address},
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod default_models;
1 change: 1 addition & 0 deletions rust/processor/src/db/postgres/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod models;
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(clippy::unused_unit)]

use crate::{
db::common::models::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{
ans_utils::{get_token_name, NameRecordV2, SetReverseLookupEvent, SubdomainExtV2},
};
use crate::{
db::common::models::token_v2_models::v2_token_utils::TokenStandard,
db::postgres::models::token_v2_models::v2_token_utils::TokenStandard,
schema::{
ans_lookup_v2, ans_primary_name_v2, current_ans_lookup_v2, current_ans_primary_name_v2,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
db::common::models::default_models::move_resources::MoveResource,
db::postgres::models::default_models::move_resources::MoveResource,
utils::util::{
bigdecimal_to_u64, deserialize_from_string, parse_timestamp_secs, standardize_address,
truncate_str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::{
db::postgres::models::{
ans_models::{
ans_lookup::{AnsPrimaryName, CurrentAnsPrimaryName},
ans_utils::SetReverseLookupEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
coin_utils::{CoinEvent, EventGuidResource},
};
use crate::{
db::common::models::{
db::postgres::models::{
fungible_asset_models::{
v2_fungible_asset_activities::{
CoinType, CurrentCoinBalancePK, EventToCoinType, BURN_GAS_EVENT_CREATION_NUM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

use super::coin_utils::{CoinInfoType, CoinResource};
use crate::{
db::common::models::fungible_asset_models::v2_fungible_asset_activities::EventToCoinType,
db::postgres::models::fungible_asset_models::v2_fungible_asset_activities::EventToCoinType,
schema::{coin_balances, current_coin_balances},
utils::util::standardize_address,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(clippy::unused_unit)]

use crate::{
db::common::models::default_models::move_tables::TableItem,
db::common::models::default_models::raw_table_items::RawTableItem,
schema::coin_supply,
utils::util::{hash_str, APTOS_COIN_TYPE_STR},
};
Expand Down Expand Up @@ -53,8 +53,13 @@ impl CoinSupply {
}

// Convert to TableItem model. Some fields are just placeholders
let table_item_model =
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);
let table_item_model = RawTableItem::postgres_table_item_from_write_item(
write_table_item,
0,
txn_version,
0,
txn_timestamp,
);

// Return early if not aptos coin aggregator key
let table_key = table_item_model.decoded_key.as_str().unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
db::common::models::{default_models::move_resources::MoveResource, resources::COIN_ADDR},
db::postgres::models::{default_models::move_resources::MoveResource, resources::COIN_ADDR},
utils::util::{deserialize_from_string, hash_str, standardize_address, truncate_str},
};
use anyhow::{bail, Context, Result};
Expand Down
6 changes: 6 additions & 0 deletions rust/processor/src/db/postgres/models/default_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod block_metadata_transactions;
pub mod move_resources;
pub mod move_tables;
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

use crate::{
db::common::models::default_models::raw_table_items::{RawTableItem, TableItemConvertible},
schema::{current_table_items, table_items, table_metadatas},
};
use aptos_protos::transaction::v1::WriteTableItem;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(table_handle, key_hash))]
#[diesel(table_name = current_table_items)]
pub struct CurrentTableItem {
pub table_handle: String,
pub key_hash: String,
pub key: String,
pub decoded_key: serde_json::Value,
pub decoded_value: Option<serde_json::Value>,
pub last_transaction_version: i64,
pub is_deleted: bool,
}

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, write_set_change_index))]
#[diesel(table_name = table_items)]
pub struct TableItem {
pub transaction_version: i64,
pub write_set_change_index: i64,
pub transaction_block_height: i64,
pub key: String,
pub table_handle: String,
pub decoded_key: serde_json::Value,
pub decoded_value: Option<serde_json::Value>,
pub is_deleted: bool,
}

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(handle))]
#[diesel(table_name = table_metadatas)]
pub struct TableMetadata {
pub handle: String,
pub key_type: String,
pub value_type: String,
}

impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
transaction_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: serde_json::from_str(raw_item.decoded_key.as_str()).unwrap(),
decoded_value: raw_item
.decoded_value
.clone()
.map(|v| serde_json::from_str(v.as_str()).unwrap()),
is_deleted: raw_item.is_deleted,
}
}
}

impl TableMetadata {
pub fn from_write_table_item(table_item: &WriteTableItem) -> Self {
Self {
handle: table_item.handle.to_string(),
key_type: table_item.data.as_ref().unwrap().key_type.clone(),
value_type: table_item.data.as_ref().unwrap().value_type.clone(),
}
}
}
Loading

0 comments on commit 61ecc68

Please sign in to comment.