Skip to content

Commit

Permalink
Add tranche 2 default model to Parquet (#450)
Browse files Browse the repository at this point in the history
* add txn metadata info to parquet default processor

* remove static variabl

* lint
  • Loading branch information
yuunlimm authored Jul 12, 2024
1 parent 10bcfcb commit 08973de
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 61 deletions.
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/default_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod transactions;
pub mod write_set_changes;

// parquet models
pub mod parquet_move_modules;
pub mod parquet_move_resources;
pub mod parquet_move_tables;
pub mod parquet_transactions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
utils::util::standardize_address,
};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{
DeleteModule, MoveModule as MoveModulePB, MoveModuleBytecode, WriteModule,
};
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct MoveModule {
pub txn_version: i64,
pub write_set_change_index: i64,
pub block_height: i64,
pub name: String,
pub address: String,
pub bytecode: Option<Vec<u8>>,
pub exposed_functions: Option<String>,
pub friends: Option<String>,
pub structs: Option<String>,
pub is_deleted: bool,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for MoveModule {
const TABLE_NAME: &'static str = "move_modules";
}

impl HasVersion for MoveModule {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for MoveModule {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MoveModuleByteCodeParsed {
pub address: String,
pub name: String,
pub bytecode: Vec<u8>,
pub exposed_functions: String,
pub friends: String,
pub structs: String,
}

impl MoveModule {
pub fn from_write_module(
write_module: &WriteModule,
write_set_change_index: i64,
txn_version: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let parsed_data = Self::convert_move_module_bytecode(write_module.data.as_ref().unwrap());
Self {
txn_version,
write_set_change_index,
block_height,
// TODO: remove the useless_asref lint when new clippy nighly is released.
#[allow(clippy::useless_asref)]
name: parsed_data
.clone()
.map(|d| d.name.clone())
.unwrap_or_default(),
address: standardize_address(&write_module.address.to_string()),
bytecode: parsed_data.clone().map(|d| d.bytecode.clone()),
exposed_functions: parsed_data.clone().map(|d| d.exposed_functions.clone()),
friends: parsed_data.clone().map(|d| d.friends.clone()),
structs: parsed_data.map(|d| d.structs.clone()),
is_deleted: false,
block_timestamp,
}
}

pub fn from_delete_module(
delete_module: &DeleteModule,
write_set_change_index: i64,
txn_version: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
Self {
txn_version,
block_height,
write_set_change_index,
// TODO: remove the useless_asref lint when new clippy nighly is released.
#[allow(clippy::useless_asref)]
name: delete_module
.module
.clone()
.map(|d| d.name.clone())
.unwrap_or_default(),
address: standardize_address(&delete_module.address.to_string()),
bytecode: None,
exposed_functions: None,
friends: None,
structs: None,
is_deleted: true,
block_timestamp,
}
}

pub fn convert_move_module_bytecode(
mmb: &MoveModuleBytecode,
) -> Option<MoveModuleByteCodeParsed> {
mmb.abi
.as_ref()
.map(|abi| Self::convert_move_module(abi, mmb.bytecode.clone()))
}

pub fn convert_move_module(
move_module: &MoveModulePB,
bytecode: Vec<u8>,
) -> MoveModuleByteCodeParsed {
MoveModuleByteCodeParsed {
address: standardize_address(&move_module.address.to_string()),
name: move_module.name.clone(),
bytecode,
exposed_functions: move_module
.exposed_functions
.iter()
.map(|move_func| serde_json::to_value(move_func).unwrap())
.map(|value| canonical_json::to_string(&value).unwrap())
.collect(),
friends: move_module
.friends
.iter()
.map(|move_module_id| serde_json::to_value(move_module_id).unwrap())
.map(|value| canonical_json::to_string(&value).unwrap())
.collect(),
structs: move_module
.structs
.iter()
.map(|move_struct| serde_json::to_value(move_struct).unwrap())
.map(|value| canonical_json::to_string(&value).unwrap())
.collect(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,33 @@ pub struct CurrentTableItem {
pub last_transaction_version: i64,
pub is_deleted: bool,
}
#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter,
)]
pub struct TableMetadata {
pub handle: String,
pub key_type: String,
pub value_type: String,
}

impl NamedTable for TableMetadata {
const TABLE_NAME: &'static str = "table_metadatas";
}

impl HasVersion for TableMetadata {
fn version(&self) -> i64 {
0 // This is a placeholder value to avoid a compile error
}
}

impl GetTimeStamp for TableMetadata {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
#[warn(deprecated)]
chrono::NaiveDateTime::default()
}
}

impl TableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use ahash::AHashMap;
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{
transaction::{TransactionType, TxnData},
Transaction as TransactionPB, TransactionInfo,
Transaction as TransactionPB, TransactionInfo, TransactionSizeInfo,
};
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
Expand All @@ -46,6 +46,7 @@ pub struct Transaction {
pub event_root_hash: String,
pub state_checkpoint_hash: Option<String>,
pub accumulator_root_hash: String,
pub txn_total_bytes: i64,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}
Expand Down Expand Up @@ -109,6 +110,7 @@ impl Transaction {
block_height: i64,
epoch: i64,
block_timestamp: chrono::NaiveDateTime,
txn_size_info: Option<&TransactionSizeInfo>,
) -> Self {
Self {
txn_type,
Expand Down Expand Up @@ -136,6 +138,8 @@ impl Transaction {
num_write_set_changes: info.changes.len() as i64,
epoch,
payload_type,
txn_total_bytes: txn_size_info
.map_or(0, |size_info| size_info.transaction_bytes as i64),
block_timestamp,
}
}
Expand Down Expand Up @@ -185,13 +189,21 @@ impl Transaction {
#[allow(deprecated)]
let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0)
.expect("Txn Timestamp is invalid!");

let txn_size_info = transaction.size_info.as_ref();
let empty_vec = Vec::new();
let write_set_size_info = txn_size_info
.as_ref()
.map_or(&empty_vec, |size_info| &size_info.write_op_size_info);

match txn_data {
TxnData::User(user_txn) => {
let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes(
&transaction_info.changes,
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
let request = &user_txn
.request
Expand Down Expand Up @@ -219,6 +231,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
None,
wsc,
Expand All @@ -231,6 +244,7 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
let payload = genesis_txn.payload.as_ref().unwrap();
let payload_cleaned = get_clean_writeset(payload, txn_version);
Expand All @@ -251,6 +265,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
None,
wsc,
Expand All @@ -263,6 +278,7 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
(
Self::from_transaction_info_with_data(
Expand All @@ -275,6 +291,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
Some(BlockMetadataTransaction::from_transaction(
block_metadata_txn,
Expand All @@ -298,6 +315,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
None,
vec![],
Expand All @@ -314,6 +332,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
None,
vec![],
Expand All @@ -330,6 +349,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
None,
vec![],
Expand Down
Loading

0 comments on commit 08973de

Please sign in to comment.