Skip to content

Commit

Permalink
Fix parquet default processor (#458)
Browse files Browse the repository at this point in the history
* add log

* remove size info from write set changes
  • Loading branch information
yuunlimm authored Jul 15, 2024
1 parent 7e24dd8 commit fbffc33
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,6 @@ impl Transaction {
.expect("Txn Timestamp is invalid!");

let txn_size_info = transaction.size_info.as_ref();
let empty_vec = Vec::new();
let write_set_size_info = txn_size_info
.as_ref()
.map_or(&empty_vec, |size_info| &size_info.write_op_size_info);

match txn_data {
TxnData::User(user_txn) => {
Expand All @@ -203,7 +199,6 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
let request = &user_txn
.request
Expand Down Expand Up @@ -244,7 +239,6 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
let payload = genesis_txn.payload.as_ref().unwrap();
let payload_cleaned = get_clean_writeset(payload, txn_version);
Expand Down Expand Up @@ -278,7 +272,6 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
(
Self::from_transaction_info_with_data(
Expand Down Expand Up @@ -327,7 +320,6 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
(
Self::from_transaction_info_with_data(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ use allocative_derive::Allocative;
use anyhow::Context;
use aptos_protos::transaction::v1::{
write_set_change::{Change as WriteSetChangeEnum, Type as WriteSetChangeTypeEnum},
WriteOpSizeInfo, WriteSetChange as WriteSetChangePB,
WriteSetChange as WriteSetChangePB,
};
use field_count::FieldCount;
use itertools::Itertools;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

Expand All @@ -33,9 +32,6 @@ pub struct WriteSetChange {
pub change_type: String,
pub resource_address: String,
pub block_height: i64,
pub key_bytes: i64,
pub value_bytes: i64,
pub total_bytes: i64,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}
Expand Down Expand Up @@ -63,7 +59,6 @@ impl WriteSetChange {
txn_version: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
write_set_size_info: &WriteOpSizeInfo,
) -> anyhow::Result<Option<(Self, WriteSetChangeDetail)>> {
let change_type = Self::get_write_set_change_type(write_set_change);
let change = write_set_change
Expand All @@ -78,10 +73,6 @@ impl WriteSetChange {
hex::encode(inner.state_key_hash.as_slice()).as_str(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: standardize_address(&inner.address),
write_set_change_index,
Expand All @@ -102,10 +93,6 @@ impl WriteSetChange {
hex::encode(inner.state_key_hash.as_slice()).as_str(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: standardize_address(&inner.address),
write_set_change_index,
Expand Down Expand Up @@ -142,10 +129,6 @@ impl WriteSetChange {
inner.state_key_hash.as_slice(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: standardize_address(&inner.address),
write_set_change_index,
Expand Down Expand Up @@ -178,10 +161,6 @@ impl WriteSetChange {
inner.state_key_hash.as_slice(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: standardize_address(&inner.address),
write_set_change_index,
Expand All @@ -206,10 +185,6 @@ impl WriteSetChange {
hex::encode(inner.state_key_hash.as_slice()).as_str(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: String::default(),
write_set_change_index,
Expand Down Expand Up @@ -237,10 +212,6 @@ impl WriteSetChange {
hex::encode(inner.state_key_hash.as_slice()).as_str(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: String::default(),
write_set_change_index,
Expand All @@ -257,38 +228,33 @@ impl WriteSetChange {
txn_version: i64,
block_height: i64,
timestamp: chrono::NaiveDateTime,
size_info: &[WriteOpSizeInfo],
) -> (Vec<Self>, Vec<WriteSetChangeDetail>) {
let results: Vec<(Self, WriteSetChangeDetail)> = write_set_changes
write_set_changes
.iter()
.zip_eq(size_info.iter())
.enumerate()
.filter_map(
|(write_set_change_index, (write_set_change, write_set_size_info))| {
match Self::from_write_set_change(
write_set_change,
write_set_change_index as i64,
txn_version,
block_height,
timestamp,
write_set_size_info,
) {
Ok(Some((change, detail))) => Some((change, detail)),
Ok(None) => None,
Err(e) => {
tracing::error!(
"Failed to convert write set change: {:?} with error: {:?}",
write_set_change,
e
);
panic!("Failed to convert write set change.")
},
}
},
)
.collect::<Vec<(Self, WriteSetChangeDetail)>>();

results.into_iter().unzip()
.filter_map(|(write_set_change_index, write_set_change)| {
match Self::from_write_set_change(
write_set_change,
write_set_change_index as i64,
txn_version,
block_height,
timestamp,
) {
Ok(Some((change, detail))) => Some((change, detail)),
Ok(None) => None,
Err(e) => {
tracing::error!(
"Failed to convert write set change: {:?} with error: {:?}",
write_set_change,
e
);
panic!("Failed to convert write set change.")
},
}
})
.collect::<Vec<(Self, WriteSetChangeDetail)>>()
.into_iter()
.unzip()
}

fn get_write_set_change_type(t: &WriteSetChangePB) -> String {
Expand Down

0 comments on commit fbffc33

Please sign in to comment.