Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tranche 2 default model to Parquet #450

Merged
merged 3 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/default_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod transactions;
pub mod write_set_changes;

// parquet models
pub mod parquet_move_modules;
pub mod parquet_move_resources;
pub mod parquet_move_tables;
pub mod parquet_transactions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
utils::util::standardize_address,
};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{
DeleteModule, MoveModule as MoveModulePB, MoveModuleBytecode, WriteModule,
};
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct MoveModule {
pub txn_version: i64,
pub write_set_change_index: i64,
pub block_height: i64,
pub name: String,
pub address: String,
pub bytecode: Option<Vec<u8>>,
pub exposed_functions: Option<String>,
pub friends: Option<String>,
pub structs: Option<String>,
pub is_deleted: bool,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for MoveModule {
const TABLE_NAME: &'static str = "move_modules";
}

impl HasVersion for MoveModule {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for MoveModule {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MoveModuleByteCodeParsed {
pub address: String,
pub name: String,
pub bytecode: Vec<u8>,
pub exposed_functions: String,
pub friends: String,
pub structs: String,
}

impl MoveModule {
pub fn from_write_module(
write_module: &WriteModule,
write_set_change_index: i64,
txn_version: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let parsed_data = Self::convert_move_module_bytecode(write_module.data.as_ref().unwrap());
Self {
txn_version,
write_set_change_index,
block_height,
// TODO: remove the useless_asref lint when new clippy nighly is released.
#[allow(clippy::useless_asref)]
name: parsed_data
.clone()
.map(|d| d.name.clone())
.unwrap_or_default(),
address: standardize_address(&write_module.address.to_string()),
bytecode: parsed_data.clone().map(|d| d.bytecode.clone()),
exposed_functions: parsed_data.clone().map(|d| d.exposed_functions.clone()),
friends: parsed_data.clone().map(|d| d.friends.clone()),
structs: parsed_data.map(|d| d.structs.clone()),
is_deleted: false,
block_timestamp,
}
}

pub fn from_delete_module(
delete_module: &DeleteModule,
write_set_change_index: i64,
txn_version: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
Self {
txn_version,
block_height,
write_set_change_index,
// TODO: remove the useless_asref lint when new clippy nighly is released.
#[allow(clippy::useless_asref)]
name: delete_module
.module
.clone()
.map(|d| d.name.clone())
.unwrap_or_default(),
address: standardize_address(&delete_module.address.to_string()),
bytecode: None,
exposed_functions: None,
friends: None,
structs: None,
is_deleted: true,
block_timestamp,
}
}

pub fn convert_move_module_bytecode(
mmb: &MoveModuleBytecode,
) -> Option<MoveModuleByteCodeParsed> {
mmb.abi
.as_ref()
.map(|abi| Self::convert_move_module(abi, mmb.bytecode.clone()))
}

pub fn convert_move_module(
move_module: &MoveModulePB,
bytecode: Vec<u8>,
) -> MoveModuleByteCodeParsed {
MoveModuleByteCodeParsed {
address: standardize_address(&move_module.address.to_string()),
name: move_module.name.clone(),
bytecode,
exposed_functions: move_module
.exposed_functions
.iter()
.map(|move_func| serde_json::to_value(move_func).unwrap())
.map(|value| canonical_json::to_string(&value).unwrap())
.collect(),
friends: move_module
.friends
.iter()
.map(|move_module_id| serde_json::to_value(move_module_id).unwrap())
.map(|value| canonical_json::to_string(&value).unwrap())
.collect(),
structs: move_module
.structs
.iter()
.map(|move_struct| serde_json::to_value(move_struct).unwrap())
.map(|value| canonical_json::to_string(&value).unwrap())
.collect(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,33 @@ pub struct CurrentTableItem {
pub last_transaction_version: i64,
pub is_deleted: bool,
}
#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter,
)]
pub struct TableMetadata {
pub handle: String,
pub key_type: String,
pub value_type: String,
}

impl NamedTable for TableMetadata {
const TABLE_NAME: &'static str = "table_metadatas";
}

impl HasVersion for TableMetadata {
fn version(&self) -> i64 {
0 // This is a placeholder value to avoid a compile error
}
}

impl GetTimeStamp for TableMetadata {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
#[warn(deprecated)]
chrono::NaiveDateTime::default()
}
}

impl TableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ use ahash::AHashMap;
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{
transaction::{TransactionType, TxnData},
Transaction as TransactionPB, TransactionInfo,
Transaction as TransactionPB, TransactionInfo, TransactionSizeInfo, WriteOpSizeInfo,
};
use field_count::FieldCount;
use once_cell::sync::Lazy;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

static EMPTY_VEC: Lazy<Vec<WriteOpSizeInfo>> = Lazy::new(Vec::new);

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter,
)]
Expand All @@ -46,6 +49,7 @@ pub struct Transaction {
pub event_root_hash: String,
pub state_checkpoint_hash: Option<String>,
pub accumulator_root_hash: String,
pub txn_total_bytes: i64,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}
Expand Down Expand Up @@ -109,6 +113,7 @@ impl Transaction {
block_height: i64,
epoch: i64,
block_timestamp: chrono::NaiveDateTime,
txn_size_info: Option<&TransactionSizeInfo>,
) -> Self {
Self {
txn_type,
Expand Down Expand Up @@ -136,6 +141,8 @@ impl Transaction {
num_write_set_changes: info.changes.len() as i64,
epoch,
payload_type,
txn_total_bytes: txn_size_info
.map_or(0, |size_info| size_info.transaction_bytes as i64),
block_timestamp,
}
}
Expand Down Expand Up @@ -185,13 +192,22 @@ impl Transaction {
#[allow(deprecated)]
let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0)
.expect("Txn Timestamp is invalid!");

let txn_size_info = transaction.size_info.as_ref();
let write_set_size_info: &Vec<WriteOpSizeInfo> = if let Some(size_info) = txn_size_info {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let write_set_size_info: &Vec<WriteOpSizeInfo> = if let Some(size_info) = txn_size_info {
let write_set_size_info = if let Some(size_info) = txn_size_info {

size_info.write_op_size_info.as_ref()
} else {
&EMPTY_VEC
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use Vec::new() instead of static variable?

};

match txn_data {
TxnData::User(user_txn) => {
let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes(
&transaction_info.changes,
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
let request = &user_txn
.request
Expand Down Expand Up @@ -219,6 +235,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
None,
wsc,
Expand All @@ -231,6 +248,7 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
let payload = genesis_txn.payload.as_ref().unwrap();
let payload_cleaned = get_clean_writeset(payload, txn_version);
Expand All @@ -251,6 +269,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
None,
wsc,
Expand All @@ -263,6 +282,7 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
(
Self::from_transaction_info_with_data(
Expand All @@ -275,6 +295,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
Some(BlockMetadataTransaction::from_transaction(
block_metadata_txn,
Expand All @@ -298,6 +319,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
None,
vec![],
Expand All @@ -314,6 +336,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
None,
vec![],
Expand All @@ -330,6 +353,7 @@ impl Transaction {
block_height,
epoch,
block_timestamp,
txn_size_info,
),
None,
vec![],
Expand Down
Loading
Loading