Skip to content

Commit

Permalink
[SDK-parquet] parquet default processor extractor step (#601)
Browse files Browse the repository at this point in the history
### Description
This is to add an extractor step for parquet default processor.

Changes invovled:
- added a map to filter parquet structs.
- added a new model RawTableItem and a trait `TableItemConvertible` which is implemented for both Parquet and Postgres TableItem. 
- updated a (Postgres) `TableItem::from_write_table_item` to return `table_items` only

Context on why we added a new `RawTableItem` model:
**tl;dr**
This is to reduce the duplicated code as much as possible and reuse the common parsing logic. RawTableItem represents parsed transaction data. This struct will act as the unified output from the parsing function. Parquet TableItem and Postgres TableItem will implement a TableItemConvertible trait to convert RawTableItem into the appropriate format. 

With this approach, parsing logic is centralized, and each processor (Postgres or Parquet) can convert RawTableItem to its respective TableItem type, keeping code DRY and maintainable while supporting format-specific needs.
  • Loading branch information
yuunlimm authored Nov 18, 2024
1 parent 0e21881 commit 552ecc7
Show file tree
Hide file tree
Showing 134 changed files with 570 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Chan
use diesel::{Identifiable, Insertable, Queryable};
use field_count::FieldCount;
use processor::{
db::common::models::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
Expand Down
14 changes: 1 addition & 13 deletions rust/processor/src/db/common/models/default_models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod block_metadata_transactions;
pub mod move_resources;
pub mod move_tables;

// parquet models
pub mod parquet_move_modules;
pub mod parquet_move_resources;
pub mod parquet_move_tables;
pub mod parquet_transactions;
pub mod parquet_write_set_changes;
pub mod raw_table_items;
137 changes: 0 additions & 137 deletions rust/processor/src/db/common/models/default_models/move_tables.rs

This file was deleted.

112 changes: 112 additions & 0 deletions rust/processor/src/db/common/models/default_models/raw_table_items.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use crate::{
db::postgres::models::default_models::move_tables::{CurrentTableItem, TableItem},
utils::util::{hash_str, standardize_address},
};
use aptos_protos::transaction::v1::{DeleteTableItem, WriteTableItem};

/// RawTableItem is a struct that will be used to converted into Postgres or Parquet TableItem
pub struct RawTableItem {
pub txn_version: i64,
pub block_timestamp: chrono::NaiveDateTime,
pub write_set_change_index: i64,
pub transaction_block_height: i64,
pub table_key: String,
pub table_handle: String,
pub decoded_key: String,
pub decoded_value: Option<String>,
pub is_deleted: bool,
}

impl RawTableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
txn_version,
write_set_change_index,
transaction_block_height,
table_key: write_table_item.key.to_string(),
table_handle: standardize_address(&write_table_item.handle.to_string()),
decoded_key: write_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: Some(write_table_item.data.as_ref().unwrap().value.clone()),
is_deleted: false,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&write_table_item.handle.to_string()),
key_hash: hash_str(&write_table_item.key.to_string()),
key: write_table_item.key.to_string(),
decoded_key: serde_json::from_str(
write_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: serde_json::from_str(
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
last_transaction_version: txn_version,
is_deleted: false,
},
)
}

pub fn from_delete_table_item(
delete_table_item: &DeleteTableItem,
write_set_change_index: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
txn_version,
write_set_change_index,
transaction_block_height,
table_key: delete_table_item.key.to_string(),
table_handle: standardize_address(&delete_table_item.handle.to_string()),
decoded_key: delete_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: None,
is_deleted: true,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&delete_table_item.handle.to_string()),
key_hash: hash_str(&delete_table_item.key.to_string()),
key: delete_table_item.key.to_string(),
decoded_key: serde_json::from_str(
delete_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: None,
last_transaction_version: txn_version,
is_deleted: true,
},
)
}

pub fn postgres_table_item_from_write_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> TableItem {
let (raw_table_item, _current_table_item) = RawTableItem::from_write_table_item(
write_table_item,
write_set_change_index,
txn_version,
transaction_block_height,
block_timestamp,
);
TableItem::from_raw(&raw_table_item)
}
}

pub trait TableItemConvertible {
fn from_raw(raw_item: &RawTableItem) -> Self;
}
18 changes: 0 additions & 18 deletions rust/processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod account_transaction_models;
pub mod ans_models;
pub mod coin_models;
pub mod default_models;
pub mod events_models;
pub mod fungible_asset_models;
pub mod ledger_info;
pub mod object_models;
pub mod processor_status;
pub mod property_map;
pub mod resources;
pub mod stake_models;
pub mod token_models;
pub mod token_v2_models;
pub mod transaction_metadata_model;
pub mod user_transactions_models;
2 changes: 2 additions & 0 deletions rust/processor/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub mod common;
pub mod parquet;
pub mod postgres;
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod models;
6 changes: 6 additions & 0 deletions rust/processor/src/db/parquet/models/default_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// parquet models
pub mod parquet_move_modules;
pub mod parquet_move_resources;
pub mod parquet_move_tables;
pub mod parquet_transactions;
pub mod parquet_write_set_changes;
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::default_models::raw_table_items::{RawTableItem, TableItemConvertible},
utils::util::{hash_str, standardize_address},
};
use allocative_derive::Allocative;
Expand Down Expand Up @@ -164,3 +165,19 @@ impl TableMetadata {
}
}
}

impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
txn_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
table_key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: raw_item.decoded_key.clone(),
decoded_value: raw_item.decoded_value.clone(),
is_deleted: raw_item.is_deleted,
block_timestamp: raw_item.block_timestamp,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use super::{
block_metadata_transactions::BlockMetadataTransactionModel,
parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel},
};
use super::parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel};
use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::postgres::models::default_models::block_metadata_transactions::BlockMetadataTransactionModel,
utils::{
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
util::{get_clean_payload, get_clean_writeset, get_payload_type, standardize_address},
Expand Down Expand Up @@ -377,6 +375,7 @@ impl Transaction {
let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) =
Self::from_transaction(txn);
txns.push(txn.clone());
// TODO: Remove once fully migrated
transaction_version_to_struct_count
.entry(txn.txn_version)
.and_modify(|e| *e += 1)
Expand All @@ -386,6 +385,7 @@ impl Transaction {
block_metadata_txns.push(a.clone());
}

// TODO: Remove once fully migrated
if !wsc_list.is_empty() {
transaction_version_to_struct_count
.entry(txn.txn_version)
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod default_models;
1 change: 1 addition & 0 deletions rust/processor/src/db/postgres/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod models;
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(clippy::unused_unit)]

use crate::{
db::common::models::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{
ans_utils::{get_token_name, NameRecordV2, SetReverseLookupEvent, SubdomainExtV2},
};
use crate::{
db::common::models::token_v2_models::v2_token_utils::TokenStandard,
db::postgres::models::token_v2_models::v2_token_utils::TokenStandard,
schema::{
ans_lookup_v2, ans_primary_name_v2, current_ans_lookup_v2, current_ans_primary_name_v2,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
db::common::models::default_models::move_resources::MoveResource,
db::postgres::models::default_models::move_resources::MoveResource,
utils::util::{
bigdecimal_to_u64, deserialize_from_string, parse_timestamp_secs, standardize_address,
truncate_str,
Expand Down
Loading

0 comments on commit 552ecc7

Please sign in to comment.