Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK-parquet] parquet default processor extractor step #601

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Chan
use diesel::{Identifiable, Insertable, Queryable};
use field_count::FieldCount;
use processor::{
db::common::models::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
Expand Down
14 changes: 1 addition & 13 deletions rust/processor/src/db/common/models/default_models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod block_metadata_transactions;
pub mod move_resources;
pub mod move_tables;

// parquet models
pub mod parquet_move_modules;
pub mod parquet_move_resources;
pub mod parquet_move_tables;
pub mod parquet_transactions;
pub mod parquet_write_set_changes;
pub mod raw_table_items;
137 changes: 0 additions & 137 deletions rust/processor/src/db/common/models/default_models/move_tables.rs

This file was deleted.

112 changes: 112 additions & 0 deletions rust/processor/src/db/common/models/default_models/raw_table_items.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use crate::{
db::postgres::models::default_models::move_tables::{CurrentTableItem, TableItem},
utils::util::{hash_str, standardize_address},
};
use aptos_protos::transaction::v1::{DeleteTableItem, WriteTableItem};

/// RawTableItem is a struct that will be used to converted into Postgres or Parquet TableItem
pub struct RawTableItem {
pub txn_version: i64,
pub block_timestamp: chrono::NaiveDateTime,
pub write_set_change_index: i64,
pub transaction_block_height: i64,
pub table_key: String,
pub table_handle: String,
pub decoded_key: String,
pub decoded_value: Option<String>,
pub is_deleted: bool,
}

impl RawTableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
txn_version,
write_set_change_index,
transaction_block_height,
table_key: write_table_item.key.to_string(),
table_handle: standardize_address(&write_table_item.handle.to_string()),
decoded_key: write_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: Some(write_table_item.data.as_ref().unwrap().value.clone()),
is_deleted: false,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&write_table_item.handle.to_string()),
key_hash: hash_str(&write_table_item.key.to_string()),
key: write_table_item.key.to_string(),
decoded_key: serde_json::from_str(
write_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: serde_json::from_str(
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
last_transaction_version: txn_version,
is_deleted: false,
},
)
}

pub fn from_delete_table_item(
delete_table_item: &DeleteTableItem,
write_set_change_index: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
txn_version,
write_set_change_index,
transaction_block_height,
table_key: delete_table_item.key.to_string(),
table_handle: standardize_address(&delete_table_item.handle.to_string()),
decoded_key: delete_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: None,
is_deleted: true,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&delete_table_item.handle.to_string()),
key_hash: hash_str(&delete_table_item.key.to_string()),
key: delete_table_item.key.to_string(),
decoded_key: serde_json::from_str(
delete_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: None,
last_transaction_version: txn_version,
is_deleted: true,
},
)
}

pub fn postgres_table_item_from_write_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> TableItem {
let (raw_table_item, _current_table_item) = RawTableItem::from_write_table_item(
write_table_item,
write_set_change_index,
txn_version,
transaction_block_height,
block_timestamp,
);
TableItem::from_raw(&raw_table_item)
}
}

pub trait TableItemConvertible {
fn from_raw(raw_item: &RawTableItem) -> Self;
}
18 changes: 0 additions & 18 deletions rust/processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod account_transaction_models;
pub mod ans_models;
pub mod coin_models;
pub mod default_models;
pub mod events_models;
pub mod fungible_asset_models;
pub mod ledger_info;
pub mod object_models;
pub mod processor_status;
pub mod property_map;
pub mod resources;
pub mod stake_models;
pub mod token_models;
pub mod token_v2_models;
pub mod transaction_metadata_model;
pub mod user_transactions_models;
2 changes: 2 additions & 0 deletions rust/processor/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub mod common;
yuunlimm marked this conversation as resolved.
Show resolved Hide resolved
pub mod parquet;
pub mod postgres;
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod models;
6 changes: 6 additions & 0 deletions rust/processor/src/db/parquet/models/default_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// parquet models
pub mod parquet_move_modules;
pub mod parquet_move_resources;
pub mod parquet_move_tables;
pub mod parquet_transactions;
pub mod parquet_write_set_changes;
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::default_models::raw_table_items::{RawTableItem, TableItemConvertible},
utils::util::{hash_str, standardize_address},
};
use allocative_derive::Allocative;
Expand Down Expand Up @@ -164,3 +165,19 @@ impl TableMetadata {
}
}
}

impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
txn_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
table_key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: raw_item.decoded_key.clone(),
decoded_value: raw_item.decoded_value.clone(),
is_deleted: raw_item.is_deleted,
block_timestamp: raw_item.block_timestamp,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use super::{
block_metadata_transactions::BlockMetadataTransactionModel,
parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel},
};
use super::parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel};
use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::postgres::models::default_models::block_metadata_transactions::BlockMetadataTransactionModel,
utils::{
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
util::{get_clean_payload, get_clean_writeset, get_payload_type, standardize_address},
Expand Down Expand Up @@ -377,6 +375,7 @@ impl Transaction {
let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) =
Self::from_transaction(txn);
txns.push(txn.clone());
// TODO: Remove once fully migrated
transaction_version_to_struct_count
.entry(txn.txn_version)
.and_modify(|e| *e += 1)
Expand All @@ -386,6 +385,7 @@ impl Transaction {
block_metadata_txns.push(a.clone());
}

// TODO: Remove once fully migrated
if !wsc_list.is_empty() {
transaction_version_to_struct_count
.entry(txn.txn_version)
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod default_models;
1 change: 1 addition & 0 deletions rust/processor/src/db/postgres/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod models;
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(clippy::unused_unit)]

use crate::{
db::common::models::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{
ans_utils::{get_token_name, NameRecordV2, SetReverseLookupEvent, SubdomainExtV2},
};
use crate::{
db::common::models::token_v2_models::v2_token_utils::TokenStandard,
db::postgres::models::token_v2_models::v2_token_utils::TokenStandard,
schema::{
ans_lookup_v2, ans_primary_name_v2, current_ans_lookup_v2, current_ans_primary_name_v2,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
db::common::models::default_models::move_resources::MoveResource,
db::postgres::models::default_models::move_resources::MoveResource,
utils::util::{
bigdecimal_to_u64, deserialize_from_string, parse_timestamp_secs, standardize_address,
truncate_str,
Expand Down
Loading
Loading