Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parquet default processor #458

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,6 @@ impl Transaction {
.expect("Txn Timestamp is invalid!");

let txn_size_info = transaction.size_info.as_ref();
let empty_vec = Vec::new();
let write_set_size_info = txn_size_info
.as_ref()
.map_or(&empty_vec, |size_info| &size_info.write_op_size_info);

match txn_data {
TxnData::User(user_txn) => {
Expand All @@ -203,7 +199,6 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
let request = &user_txn
.request
Expand Down Expand Up @@ -244,7 +239,6 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
let payload = genesis_txn.payload.as_ref().unwrap();
let payload_cleaned = get_clean_writeset(payload, txn_version);
Expand Down Expand Up @@ -278,7 +272,6 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
(
Self::from_transaction_info_with_data(
Expand Down Expand Up @@ -327,7 +320,6 @@ impl Transaction {
txn_version,
block_height,
block_timestamp,
write_set_size_info,
);
(
Self::from_transaction_info_with_data(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ use allocative_derive::Allocative;
use anyhow::Context;
use aptos_protos::transaction::v1::{
write_set_change::{Change as WriteSetChangeEnum, Type as WriteSetChangeTypeEnum},
WriteOpSizeInfo, WriteSetChange as WriteSetChangePB,
WriteSetChange as WriteSetChangePB,
};
use field_count::FieldCount;
use itertools::Itertools;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

Expand All @@ -33,9 +32,6 @@ pub struct WriteSetChange {
pub change_type: String,
pub resource_address: String,
pub block_height: i64,
pub key_bytes: i64,
pub value_bytes: i64,
pub total_bytes: i64,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}
Expand Down Expand Up @@ -63,7 +59,6 @@ impl WriteSetChange {
txn_version: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
write_set_size_info: &WriteOpSizeInfo,
) -> anyhow::Result<Option<(Self, WriteSetChangeDetail)>> {
let change_type = Self::get_write_set_change_type(write_set_change);
let change = write_set_change
Expand All @@ -78,10 +73,6 @@ impl WriteSetChange {
hex::encode(inner.state_key_hash.as_slice()).as_str(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: standardize_address(&inner.address),
write_set_change_index,
Expand All @@ -102,10 +93,6 @@ impl WriteSetChange {
hex::encode(inner.state_key_hash.as_slice()).as_str(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: standardize_address(&inner.address),
write_set_change_index,
Expand Down Expand Up @@ -142,10 +129,6 @@ impl WriteSetChange {
inner.state_key_hash.as_slice(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: standardize_address(&inner.address),
write_set_change_index,
Expand Down Expand Up @@ -178,10 +161,6 @@ impl WriteSetChange {
inner.state_key_hash.as_slice(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: standardize_address(&inner.address),
write_set_change_index,
Expand All @@ -206,10 +185,6 @@ impl WriteSetChange {
hex::encode(inner.state_key_hash.as_slice()).as_str(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: String::default(),
write_set_change_index,
Expand Down Expand Up @@ -237,10 +212,6 @@ impl WriteSetChange {
hex::encode(inner.state_key_hash.as_slice()).as_str(),
),
block_height,
key_bytes: write_set_size_info.key_bytes as i64,
value_bytes: write_set_size_info.value_bytes as i64,
total_bytes: write_set_size_info.key_bytes as i64
+ write_set_size_info.value_bytes as i64,
change_type,
resource_address: String::default(),
write_set_change_index,
Expand All @@ -257,38 +228,33 @@ impl WriteSetChange {
txn_version: i64,
block_height: i64,
timestamp: chrono::NaiveDateTime,
size_info: &[WriteOpSizeInfo],
) -> (Vec<Self>, Vec<WriteSetChangeDetail>) {
let results: Vec<(Self, WriteSetChangeDetail)> = write_set_changes
write_set_changes
.iter()
.zip_eq(size_info.iter())
.enumerate()
.filter_map(
|(write_set_change_index, (write_set_change, write_set_size_info))| {
match Self::from_write_set_change(
write_set_change,
write_set_change_index as i64,
txn_version,
block_height,
timestamp,
write_set_size_info,
) {
Ok(Some((change, detail))) => Some((change, detail)),
Ok(None) => None,
Err(e) => {
tracing::error!(
"Failed to convert write set change: {:?} with error: {:?}",
write_set_change,
e
);
panic!("Failed to convert write set change.")
},
}
},
)
.collect::<Vec<(Self, WriteSetChangeDetail)>>();

results.into_iter().unzip()
.filter_map(|(write_set_change_index, write_set_change)| {
match Self::from_write_set_change(
write_set_change,
write_set_change_index as i64,
txn_version,
block_height,
timestamp,
) {
Ok(Some((change, detail))) => Some((change, detail)),
Ok(None) => None,
Err(e) => {
tracing::error!(
"Failed to convert write set change: {:?} with error: {:?}",
write_set_change,
e
);
panic!("Failed to convert write set change.")
},
}
})
.collect::<Vec<(Self, WriteSetChangeDetail)>>()
.into_iter()
.unzip()
}

fn get_write_set_change_type(t: &WriteSetChangePB) -> String {
Expand Down
Loading