From 40a987cdde300d3bdb8cd003d8645cc710f36cb0 Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Fri, 12 Jul 2024 08:19:40 -0700 Subject: [PATCH 1/2] add log --- .../models/default_models/parquet_write_set_changes.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs index 66ba505dd..cd87da7e9 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs @@ -259,6 +259,12 @@ impl WriteSetChange { timestamp: chrono::NaiveDateTime, size_info: &[WriteOpSizeInfo], ) -> (Vec, Vec) { + tracing::info!( + "Converting {} write set changes with the {} size_info provided for version {}.", + write_set_changes.len(), + size_info.len(), + txn_version + ); let results: Vec<(Self, WriteSetChangeDetail)> = write_set_changes .iter() .zip_eq(size_info.iter()) From 84e2a55768016d12f82207c3d64d3d3476071260 Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Fri, 12 Jul 2024 18:07:56 -0700 Subject: [PATCH 2/2] remove size info from write set changes --- .../default_models/parquet_transactions.rs | 8 -- .../parquet_write_set_changes.rs | 90 ++++++------------- 2 files changed, 25 insertions(+), 73 deletions(-) diff --git a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs index 488867879..22f10715d 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs @@ -191,10 +191,6 @@ impl Transaction { .expect("Txn Timestamp is invalid!"); let txn_size_info = transaction.size_info.as_ref(); - let empty_vec = Vec::new(); - let write_set_size_info = txn_size_info - .as_ref() - .map_or(&empty_vec, |size_info| &size_info.write_op_size_info); match txn_data { TxnData::User(user_txn) => { @@ -203,7 +199,6 @@ impl Transaction { txn_version, block_height, block_timestamp, - write_set_size_info, ); let request = &user_txn .request @@ -244,7 +239,6 @@ impl Transaction { txn_version, block_height, block_timestamp, - write_set_size_info, ); let payload = genesis_txn.payload.as_ref().unwrap(); let payload_cleaned = get_clean_writeset(payload, txn_version); @@ -278,7 +272,6 @@ impl Transaction { txn_version, block_height, block_timestamp, - write_set_size_info, ); ( Self::from_transaction_info_with_data( @@ -327,7 +320,6 @@ impl Transaction { txn_version, block_height, block_timestamp, - write_set_size_info, ); ( Self::from_transaction_info_with_data( diff --git a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs index cd87da7e9..0c98243c2 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs @@ -16,10 +16,9 @@ use allocative_derive::Allocative; use anyhow::Context; use aptos_protos::transaction::v1::{ write_set_change::{Change as WriteSetChangeEnum, Type as WriteSetChangeTypeEnum}, - WriteOpSizeInfo, WriteSetChange as WriteSetChangePB, + WriteSetChange as WriteSetChangePB, }; use field_count::FieldCount; -use itertools::Itertools; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; @@ -33,9 +32,6 @@ pub struct WriteSetChange { pub change_type: String, pub resource_address: String, pub block_height: i64, - pub key_bytes: i64, - pub value_bytes: i64, - pub total_bytes: i64, #[allocative(skip)] pub block_timestamp: chrono::NaiveDateTime, } @@ -63,7 +59,6 @@ impl WriteSetChange { txn_version: i64, block_height: i64, block_timestamp: chrono::NaiveDateTime, - write_set_size_info: &WriteOpSizeInfo, ) -> anyhow::Result> { let change_type = Self::get_write_set_change_type(write_set_change); let change = write_set_change @@ -78,10 +73,6 @@ impl WriteSetChange { hex::encode(inner.state_key_hash.as_slice()).as_str(), ), block_height, - key_bytes: write_set_size_info.key_bytes as i64, - value_bytes: write_set_size_info.value_bytes as i64, - total_bytes: write_set_size_info.key_bytes as i64 - + write_set_size_info.value_bytes as i64, change_type, resource_address: standardize_address(&inner.address), write_set_change_index, @@ -102,10 +93,6 @@ impl WriteSetChange { hex::encode(inner.state_key_hash.as_slice()).as_str(), ), block_height, - key_bytes: write_set_size_info.key_bytes as i64, - value_bytes: write_set_size_info.value_bytes as i64, - total_bytes: write_set_size_info.key_bytes as i64 - + write_set_size_info.value_bytes as i64, change_type, resource_address: standardize_address(&inner.address), write_set_change_index, @@ -142,10 +129,6 @@ impl WriteSetChange { inner.state_key_hash.as_slice(), ), block_height, - key_bytes: write_set_size_info.key_bytes as i64, - value_bytes: write_set_size_info.value_bytes as i64, - total_bytes: write_set_size_info.key_bytes as i64 - + write_set_size_info.value_bytes as i64, change_type, resource_address: standardize_address(&inner.address), write_set_change_index, @@ -178,10 +161,6 @@ impl WriteSetChange { inner.state_key_hash.as_slice(), ), block_height, - key_bytes: write_set_size_info.key_bytes as i64, - value_bytes: write_set_size_info.value_bytes as i64, - total_bytes: write_set_size_info.key_bytes as i64 - + write_set_size_info.value_bytes as i64, change_type, resource_address: standardize_address(&inner.address), write_set_change_index, @@ -206,10 +185,6 @@ impl WriteSetChange { hex::encode(inner.state_key_hash.as_slice()).as_str(), ), block_height, - key_bytes: write_set_size_info.key_bytes as i64, - value_bytes: write_set_size_info.value_bytes as i64, - total_bytes: write_set_size_info.key_bytes as i64 - + write_set_size_info.value_bytes as i64, change_type, resource_address: String::default(), write_set_change_index, @@ -237,10 +212,6 @@ impl WriteSetChange { hex::encode(inner.state_key_hash.as_slice()).as_str(), ), block_height, - key_bytes: write_set_size_info.key_bytes as i64, - value_bytes: write_set_size_info.value_bytes as i64, - total_bytes: write_set_size_info.key_bytes as i64 - + write_set_size_info.value_bytes as i64, change_type, resource_address: String::default(), write_set_change_index, @@ -257,44 +228,33 @@ impl WriteSetChange { txn_version: i64, block_height: i64, timestamp: chrono::NaiveDateTime, - size_info: &[WriteOpSizeInfo], ) -> (Vec, Vec) { - tracing::info!( - "Converting {} write set changes with the {} size_info provided for version {}.", - write_set_changes.len(), - size_info.len(), - txn_version - ); - let results: Vec<(Self, WriteSetChangeDetail)> = write_set_changes + write_set_changes .iter() - .zip_eq(size_info.iter()) .enumerate() - .filter_map( - |(write_set_change_index, (write_set_change, write_set_size_info))| { - match Self::from_write_set_change( - write_set_change, - write_set_change_index as i64, - txn_version, - block_height, - timestamp, - write_set_size_info, - ) { - Ok(Some((change, detail))) => Some((change, detail)), - Ok(None) => None, - Err(e) => { - tracing::error!( - "Failed to convert write set change: {:?} with error: {:?}", - write_set_change, - e - ); - panic!("Failed to convert write set change.") - }, - } - }, - ) - .collect::>(); - - results.into_iter().unzip() + .filter_map(|(write_set_change_index, write_set_change)| { + match Self::from_write_set_change( + write_set_change, + write_set_change_index as i64, + txn_version, + block_height, + timestamp, + ) { + Ok(Some((change, detail))) => Some((change, detail)), + Ok(None) => None, + Err(e) => { + tracing::error!( + "Failed to convert write set change: {:?} with error: {:?}", + write_set_change, + e + ); + panic!("Failed to convert write set change.") + }, + } + }) + .collect::>() + .into_iter() + .unzip() } fn get_write_set_change_type(t: &WriteSetChangePB) -> String {