From 5b7c6b88d06841b28864f7ab9c434e0dbf3b46aa Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Wed, 10 Jul 2024 15:52:34 -0700 Subject: [PATCH 1/3] add txn metadata info to parquet default processor --- .../db/common/models/default_models/mod.rs | 1 + .../default_models/parquet_move_modules.rs | 155 ++++++++++++++++++ .../default_models/parquet_move_tables.rs | 22 ++- .../default_models/parquet_transactions.rs | 26 ++- .../parquet_write_set_changes.rs | 99 ++++++----- .../src/processors/parquet_processors/mod.rs | 2 + .../parquet_default_processor.rs | 113 ++++++++++--- 7 files changed, 357 insertions(+), 61 deletions(-) create mode 100644 rust/processor/src/db/common/models/default_models/parquet_move_modules.rs diff --git a/rust/processor/src/db/common/models/default_models/mod.rs b/rust/processor/src/db/common/models/default_models/mod.rs index d3d54d58f..b631e75f9 100644 --- a/rust/processor/src/db/common/models/default_models/mod.rs +++ b/rust/processor/src/db/common/models/default_models/mod.rs @@ -9,6 +9,7 @@ pub mod transactions; pub mod write_set_changes; // parquet models +pub mod parquet_move_modules; pub mod parquet_move_resources; pub mod parquet_move_tables; pub mod parquet_transactions; diff --git a/rust/processor/src/db/common/models/default_models/parquet_move_modules.rs b/rust/processor/src/db/common/models/default_models/parquet_move_modules.rs new file mode 100644 index 000000000..a6128e01f --- /dev/null +++ b/rust/processor/src/db/common/models/default_models/parquet_move_modules.rs @@ -0,0 +1,155 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#![allow(clippy::extra_unused_lifetimes)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + utils::util::standardize_address, +}; +use allocative_derive::Allocative; +use aptos_protos::transaction::v1::{ + DeleteModule, MoveModule as MoveModulePB, MoveModuleBytecode, WriteModule, +}; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct MoveModule { + pub txn_version: i64, + pub write_set_change_index: i64, + pub block_height: i64, + pub name: String, + pub address: String, + pub bytecode: Option>, + pub exposed_functions: Option, + pub friends: Option, + pub structs: Option, + pub is_deleted: bool, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for MoveModule { + const TABLE_NAME: &'static str = "move_modules"; +} + +impl HasVersion for MoveModule { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for MoveModule { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MoveModuleByteCodeParsed { + pub address: String, + pub name: String, + pub bytecode: Vec, + pub exposed_functions: String, + pub friends: String, + pub structs: String, +} + +impl MoveModule { + pub fn from_write_module( + write_module: &WriteModule, + write_set_change_index: i64, + txn_version: i64, + block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> Self { + let parsed_data = Self::convert_move_module_bytecode(write_module.data.as_ref().unwrap()); + Self { + txn_version, + write_set_change_index, + block_height, + // TODO: remove the useless_asref lint when new clippy nighly is released. + #[allow(clippy::useless_asref)] + name: parsed_data + .clone() + .map(|d| d.name.clone()) + .unwrap_or_default(), + address: standardize_address(&write_module.address.to_string()), + bytecode: parsed_data.clone().map(|d| d.bytecode.clone()), + exposed_functions: parsed_data.clone().map(|d| d.exposed_functions.clone()), + friends: parsed_data.clone().map(|d| d.friends.clone()), + structs: parsed_data.map(|d| d.structs.clone()), + is_deleted: false, + block_timestamp, + } + } + + pub fn from_delete_module( + delete_module: &DeleteModule, + write_set_change_index: i64, + txn_version: i64, + block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> Self { + Self { + txn_version, + block_height, + write_set_change_index, + // TODO: remove the useless_asref lint when new clippy nighly is released. + #[allow(clippy::useless_asref)] + name: delete_module + .module + .clone() + .map(|d| d.name.clone()) + .unwrap_or_default(), + address: standardize_address(&delete_module.address.to_string()), + bytecode: None, + exposed_functions: None, + friends: None, + structs: None, + is_deleted: true, + block_timestamp, + } + } + + pub fn convert_move_module_bytecode( + mmb: &MoveModuleBytecode, + ) -> Option { + mmb.abi + .as_ref() + .map(|abi| Self::convert_move_module(abi, mmb.bytecode.clone())) + } + + pub fn convert_move_module( + move_module: &MoveModulePB, + bytecode: Vec, + ) -> MoveModuleByteCodeParsed { + MoveModuleByteCodeParsed { + address: standardize_address(&move_module.address.to_string()), + name: move_module.name.clone(), + bytecode, + exposed_functions: move_module + .exposed_functions + .iter() + .map(|move_func| serde_json::to_value(move_func).unwrap()) + .map(|value| canonical_json::to_string(&value).unwrap()) + .collect(), + friends: move_module + .friends + .iter() + .map(|move_module_id| serde_json::to_value(move_module_id).unwrap()) + .map(|value| canonical_json::to_string(&value).unwrap()) + .collect(), + structs: move_module + .structs + .iter() + .map(|move_struct| serde_json::to_value(move_struct).unwrap()) + .map(|value| canonical_json::to_string(&value).unwrap()) + .collect(), + } + } +} diff --git a/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs b/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs index 2243277d1..1dc8da47c 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs @@ -55,13 +55,33 @@ pub struct CurrentTableItem { pub last_transaction_version: i64, pub is_deleted: bool, } -#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)] + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter, +)] pub struct TableMetadata { pub handle: String, pub key_type: String, pub value_type: String, } +impl NamedTable for TableMetadata { + const TABLE_NAME: &'static str = "table_metadatas"; +} + +impl HasVersion for TableMetadata { + fn version(&self) -> i64 { + 0 // This is a placeholder value to avoid a compile error + } +} + +impl GetTimeStamp for TableMetadata { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + #[warn(deprecated)] + chrono::NaiveDateTime::default() + } +} + impl TableItem { pub fn from_write_table_item( write_table_item: &WriteTableItem, diff --git a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs index 953c299de..a4c79df9e 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs @@ -20,12 +20,15 @@ use ahash::AHashMap; use allocative_derive::Allocative; use aptos_protos::transaction::v1::{ transaction::{TransactionType, TxnData}, - Transaction as TransactionPB, TransactionInfo, + Transaction as TransactionPB, TransactionInfo, TransactionSizeInfo, WriteOpSizeInfo, }; use field_count::FieldCount; +use once_cell::sync::Lazy; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; +static EMPTY_VEC: Lazy> = Lazy::new(Vec::new); + #[derive( Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter, )] @@ -46,6 +49,7 @@ pub struct Transaction { pub event_root_hash: String, pub state_checkpoint_hash: Option, pub accumulator_root_hash: String, + pub txn_total_bytes: i64, #[allocative(skip)] pub block_timestamp: chrono::NaiveDateTime, } @@ -109,6 +113,7 @@ impl Transaction { block_height: i64, epoch: i64, block_timestamp: chrono::NaiveDateTime, + txn_size_info: Option<&TransactionSizeInfo>, ) -> Self { Self { txn_type, @@ -136,6 +141,8 @@ impl Transaction { num_write_set_changes: info.changes.len() as i64, epoch, payload_type, + txn_total_bytes: txn_size_info + .map_or(0, |size_info| size_info.transaction_bytes as i64), block_timestamp, } } @@ -185,6 +192,14 @@ impl Transaction { #[allow(deprecated)] let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0) .expect("Txn Timestamp is invalid!"); + + let txn_size_info = transaction.size_info.as_ref(); + let write_set_size_info: &Vec = if let Some(size_info) = txn_size_info { + size_info.write_op_size_info.as_ref() + } else { + &EMPTY_VEC + }; + match txn_data { TxnData::User(user_txn) => { let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes( @@ -192,6 +207,7 @@ impl Transaction { txn_version, block_height, block_timestamp, + write_set_size_info, ); let request = &user_txn .request @@ -219,6 +235,7 @@ impl Transaction { block_height, epoch, block_timestamp, + txn_size_info, ), None, wsc, @@ -231,6 +248,7 @@ impl Transaction { txn_version, block_height, block_timestamp, + write_set_size_info, ); let payload = genesis_txn.payload.as_ref().unwrap(); let payload_cleaned = get_clean_writeset(payload, txn_version); @@ -251,6 +269,7 @@ impl Transaction { block_height, epoch, block_timestamp, + txn_size_info, ), None, wsc, @@ -263,6 +282,7 @@ impl Transaction { txn_version, block_height, block_timestamp, + write_set_size_info, ); ( Self::from_transaction_info_with_data( @@ -275,6 +295,7 @@ impl Transaction { block_height, epoch, block_timestamp, + txn_size_info, ), Some(BlockMetadataTransaction::from_transaction( block_metadata_txn, @@ -298,6 +319,7 @@ impl Transaction { block_height, epoch, block_timestamp, + txn_size_info, ), None, vec![], @@ -314,6 +336,7 @@ impl Transaction { block_height, epoch, block_timestamp, + txn_size_info, ), None, vec![], @@ -330,6 +353,7 @@ impl Transaction { block_height, epoch, block_timestamp, + txn_size_info, ), None, vec![], diff --git a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs index bab8628f6..66ba505dd 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs @@ -4,7 +4,7 @@ #![allow(clippy::extra_unused_lifetimes)] use super::{ - move_modules::MoveModule, + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata}, }; @@ -16,13 +16,16 @@ use allocative_derive::Allocative; use anyhow::Context; use aptos_protos::transaction::v1::{ write_set_change::{Change as WriteSetChangeEnum, Type as WriteSetChangeTypeEnum}, - WriteSetChange as WriteSetChangePB, + WriteOpSizeInfo, WriteSetChange as WriteSetChangePB, }; use field_count::FieldCount; +use itertools::Itertools; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; -#[derive(Allocative, Clone, Debug, Deserialize, FieldCount, Serialize, ParquetRecordWriter)] +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter, +)] pub struct WriteSetChange { pub txn_version: i64, pub write_set_change_index: i64, @@ -30,6 +33,9 @@ pub struct WriteSetChange { pub change_type: String, pub resource_address: String, pub block_height: i64, + pub key_bytes: i64, + pub value_bytes: i64, + pub total_bytes: i64, #[allocative(skip)] pub block_timestamp: chrono::NaiveDateTime, } @@ -44,21 +50,6 @@ impl HasVersion for WriteSetChange { } } -impl Default for WriteSetChange { - fn default() -> Self { - Self { - txn_version: 0, - write_set_change_index: 0, - state_key_hash: "".to_string(), - change_type: "".to_string(), - resource_address: "".to_string(), - block_height: 0, - #[allow(deprecated)] - block_timestamp: chrono::NaiveDateTime::from_timestamp(0, 0), - } - } -} - impl GetTimeStamp for WriteSetChange { fn get_timestamp(&self) -> chrono::NaiveDateTime { self.block_timestamp @@ -72,6 +63,7 @@ impl WriteSetChange { txn_version: i64, block_height: i64, block_timestamp: chrono::NaiveDateTime, + write_set_size_info: &WriteOpSizeInfo, ) -> anyhow::Result> { let change_type = Self::get_write_set_change_type(write_set_change); let change = write_set_change @@ -86,6 +78,10 @@ impl WriteSetChange { hex::encode(inner.state_key_hash.as_slice()).as_str(), ), block_height, + key_bytes: write_set_size_info.key_bytes as i64, + value_bytes: write_set_size_info.value_bytes as i64, + total_bytes: write_set_size_info.key_bytes as i64 + + write_set_size_info.value_bytes as i64, change_type, resource_address: standardize_address(&inner.address), write_set_change_index, @@ -96,6 +92,7 @@ impl WriteSetChange { write_set_change_index, txn_version, block_height, + block_timestamp, )), ))), WriteSetChangeEnum::DeleteModule(inner) => Ok(Some(( @@ -105,6 +102,10 @@ impl WriteSetChange { hex::encode(inner.state_key_hash.as_slice()).as_str(), ), block_height, + key_bytes: write_set_size_info.key_bytes as i64, + value_bytes: write_set_size_info.value_bytes as i64, + total_bytes: write_set_size_info.key_bytes as i64 + + write_set_size_info.value_bytes as i64, change_type, resource_address: standardize_address(&inner.address), write_set_change_index, @@ -115,6 +116,7 @@ impl WriteSetChange { write_set_change_index, txn_version, block_height, + block_timestamp, )), ))), WriteSetChangeEnum::WriteResource(inner) => { @@ -140,6 +142,10 @@ impl WriteSetChange { inner.state_key_hash.as_slice(), ), block_height, + key_bytes: write_set_size_info.key_bytes as i64, + value_bytes: write_set_size_info.value_bytes as i64, + total_bytes: write_set_size_info.key_bytes as i64 + + write_set_size_info.value_bytes as i64, change_type, resource_address: standardize_address(&inner.address), write_set_change_index, @@ -172,6 +178,10 @@ impl WriteSetChange { inner.state_key_hash.as_slice(), ), block_height, + key_bytes: write_set_size_info.key_bytes as i64, + value_bytes: write_set_size_info.value_bytes as i64, + total_bytes: write_set_size_info.key_bytes as i64 + + write_set_size_info.value_bytes as i64, change_type, resource_address: standardize_address(&inner.address), write_set_change_index, @@ -196,6 +206,10 @@ impl WriteSetChange { hex::encode(inner.state_key_hash.as_slice()).as_str(), ), block_height, + key_bytes: write_set_size_info.key_bytes as i64, + value_bytes: write_set_size_info.value_bytes as i64, + total_bytes: write_set_size_info.key_bytes as i64 + + write_set_size_info.value_bytes as i64, change_type, resource_address: String::default(), write_set_change_index, @@ -223,6 +237,10 @@ impl WriteSetChange { hex::encode(inner.state_key_hash.as_slice()).as_str(), ), block_height, + key_bytes: write_set_size_info.key_bytes as i64, + value_bytes: write_set_size_info.value_bytes as i64, + total_bytes: write_set_size_info.key_bytes as i64 + + write_set_size_info.value_bytes as i64, change_type, resource_address: String::default(), write_set_change_index, @@ -239,30 +257,35 @@ impl WriteSetChange { txn_version: i64, block_height: i64, timestamp: chrono::NaiveDateTime, + size_info: &[WriteOpSizeInfo], ) -> (Vec, Vec) { let results: Vec<(Self, WriteSetChangeDetail)> = write_set_changes .iter() + .zip_eq(size_info.iter()) .enumerate() - .filter_map(|(write_set_change_index, write_set_change)| { - match Self::from_write_set_change( - write_set_change, - write_set_change_index as i64, - txn_version, - block_height, - timestamp, - ) { - Ok(Some((change, detail))) => Some((change, detail)), - Ok(None) => None, - Err(e) => { - tracing::error!( - "Failed to convert write set change: {:?} with error: {:?}", - write_set_change, - e - ); - panic!("Failed to convert write set change.") - }, - } - }) + .filter_map( + |(write_set_change_index, (write_set_change, write_set_size_info))| { + match Self::from_write_set_change( + write_set_change, + write_set_change_index as i64, + txn_version, + block_height, + timestamp, + write_set_size_info, + ) { + Ok(Some((change, detail))) => Some((change, detail)), + Ok(None) => None, + Err(e) => { + tracing::error!( + "Failed to convert write set change: {:?} with error: {:?}", + write_set_change, + e + ); + panic!("Failed to convert write set change.") + }, + } + }, + ) .collect::>(); results.into_iter().unzip() diff --git a/rust/processor/src/processors/parquet_processors/mod.rs b/rust/processor/src/processors/parquet_processors/mod.rs index 5c53077bc..19ac59e09 100644 --- a/rust/processor/src/processors/parquet_processors/mod.rs +++ b/rust/processor/src/processors/parquet_processors/mod.rs @@ -2,6 +2,8 @@ use std::time::Duration; pub mod parquet_default_processor; +pub const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; + pub trait UploadIntervalConfig { fn parquet_upload_interval_in_secs(&self) -> Duration; } diff --git a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs index f0c77ac3a..7849f5bdd 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs @@ -7,13 +7,17 @@ use crate::{ ParquetProcessingResult, }, db::common::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata}, parquet_transactions::{Transaction as ParquetTransaction, TransactionModel}, parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, }, gap_detectors::ProcessingResult, - processors::{parquet_processors::UploadIntervalConfig, ProcessorName, ProcessorTrait}, + processors::{ + parquet_processors::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS}, + ProcessorName, ProcessorTrait, + }, utils::database::ArcDbPool, }; use ahash::AHashMap; @@ -27,8 +31,6 @@ use std::{ time::Duration, }; -const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; - #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct ParquetDefaultProcessorConfig { @@ -50,7 +52,9 @@ pub struct ParquetDefaultProcessor { transaction_sender: AsyncSender>, move_resource_sender: AsyncSender>, wsc_sender: AsyncSender>, - ti_sender: AsyncSender>, + table_item_sender: AsyncSender>, + move_module_sender: AsyncSender>, + table_metadata_sender: AsyncSender>, } // TODO: Since each table item has different size allocated, the pace of being backfilled to PQ varies a lot. @@ -95,7 +99,26 @@ impl ParquetDefaultProcessor { config.parquet_upload_interval_in_secs(), ); - let ti_sender = create_parquet_handler_loop::( + let table_item_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::ParquetDefaultProcessor.into(), + config.bucket_name.clone(), + config.bucket_root.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + config.parquet_upload_interval_in_secs(), + ); + let move_module_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::ParquetDefaultProcessor.into(), + config.bucket_name.clone(), + config.bucket_root.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + config.parquet_upload_interval_in_secs(), + ); + + let table_metadata_sender = create_parquet_handler_loop::( new_gap_detector_sender.clone(), ProcessorName::ParquetDefaultProcessor.into(), config.bucket_name.clone(), @@ -110,7 +133,9 @@ impl ParquetDefaultProcessor { transaction_sender, move_resource_sender, wsc_sender, - ti_sender, + table_item_sender, + move_module_sender, + table_metadata_sender, } } } @@ -119,11 +144,13 @@ impl Debug for ParquetDefaultProcessor { fn fmt(&self, f: &mut Formatter<'_>) -> Result { write!( f, - "ParquetProcessor {{ capacity of t channel: {:?}, capacity of mr channel: {:?}, capacity of wsc channel: {:?}, capacity of ti channel: {:?} }}", + "ParquetProcessor {{ capacity of trnasactions channel: {:?}, capacity of move resource channel: {:?}, capacity of wsc channel: {:?}, capacity of table items channel: {:?}, capacity of move_module channel: {:?}, capacity of table_metadata channel: {:?} }}", &self.transaction_sender.capacity(), &self.move_resource_sender.capacity(), &self.wsc_sender.capacity(), - &self.ti_sender.capacity(), + &self.table_item_sender.capacity(), + &self.move_module_sender.capacity(), + &self.table_metadata_sender.capacity(), ) } } @@ -143,13 +170,22 @@ impl ProcessorTrait for ParquetDefaultProcessor { ) -> anyhow::Result { let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let ((mr, wsc, t, ti), transaction_version_to_struct_count) = - tokio::task::spawn_blocking(move || process_transactions(transactions)) - .await - .expect("Failed to spawn_blocking for TransactionModel::from_transactions"); + let ( + ( + move_resources, + write_set_changes, + transactions, + table_items, + move_modules, + table_metadata, + ), + transaction_version_to_struct_count, + ) = tokio::task::spawn_blocking(move || process_transactions(transactions)) + .await + .expect("Failed to spawn_blocking for TransactionModel::from_transactions"); let mr_parquet_data = ParquetDataGeneric { - data: mr, + data: move_resources, transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), }; @@ -159,7 +195,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; let wsc_parquet_data = ParquetDataGeneric { - data: wsc, + data: write_set_changes, transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), }; self.wsc_sender @@ -168,7 +204,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; let t_parquet_data = ParquetDataGeneric { - data: t, + data: transactions, transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), }; self.transaction_sender @@ -177,15 +213,35 @@ impl ProcessorTrait for ParquetDefaultProcessor { .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; let ti_parquet_data = ParquetDataGeneric { - data: ti, + data: table_items, transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), }; - self.ti_sender + self.table_item_sender .send(ti_parquet_data) .await .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + let mm_parquet_data = ParquetDataGeneric { + data: move_modules, + transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), + }; + + self.move_module_sender + .send(mm_parquet_data) + .await + .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + + let tm_parquet_data = ParquetDataGeneric { + data: table_metadata, + transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), + }; + + self.table_metadata_sender + .send(tm_parquet_data) + .await + .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + Ok(ProcessingResult::ParquetProcessingResult( ParquetProcessingResult { start_version: start_version as i64, @@ -209,6 +265,8 @@ pub fn process_transactions( Vec, Vec, Vec, + Vec, + Vec, ), AHashMap, ) { @@ -229,7 +287,9 @@ pub fn process_transactions( match detail { WriteSetChangeDetail::Module(module) => { move_modules.push(module.clone()); - // transaction_version_to_struct_count.entry(module.transaction_version).and_modify(|e| *e += 1); // TODO: uncomment in Tranche2 + transaction_version_to_struct_count + .entry(module.txn_version) + .and_modify(|e| *e += 1); }, WriteSetChangeDetail::Resource(resource) => { transaction_version_to_struct_count @@ -238,8 +298,10 @@ pub fn process_transactions( move_resources.push(resource); }, WriteSetChangeDetail::Table(item, current_item, metadata) => { + let txn_version = item.txn_version; + transaction_version_to_struct_count - .entry(item.txn_version) + .entry(txn_version) .and_modify(|e| *e += 1); table_items.push(item); @@ -254,7 +316,9 @@ pub fn process_transactions( if let Some(meta) = metadata { table_metadata.insert(meta.handle.clone(), meta); - // transaction_version_to_struct_count.entry(current_item.last_transaction_version).and_modify(|e| *e += 1); // TODO: uncomment in Tranche2 + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1); } }, } @@ -271,7 +335,14 @@ pub fn process_transactions( table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); ( - (move_resources, write_set_changes, txns, table_items), + ( + move_resources, + write_set_changes, + txns, + table_items, + move_modules, + table_metadata, + ), transaction_version_to_struct_count, ) } From 40abea660db1d919dc985f5be50856d7095ebac2 Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Thu, 11 Jul 2024 14:27:45 -0700 Subject: [PATCH 2/3] remove static variabl --- .../models/default_models/parquet_transactions.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs index a4c79df9e..0b7ff2be2 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs @@ -20,14 +20,12 @@ use ahash::AHashMap; use allocative_derive::Allocative; use aptos_protos::transaction::v1::{ transaction::{TransactionType, TxnData}, - Transaction as TransactionPB, TransactionInfo, TransactionSizeInfo, WriteOpSizeInfo, + Transaction as TransactionPB, TransactionInfo, TransactionSizeInfo, }; use field_count::FieldCount; -use once_cell::sync::Lazy; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; -static EMPTY_VEC: Lazy> = Lazy::new(Vec::new); #[derive( Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter, @@ -194,11 +192,10 @@ impl Transaction { .expect("Txn Timestamp is invalid!"); let txn_size_info = transaction.size_info.as_ref(); - let write_set_size_info: &Vec = if let Some(size_info) = txn_size_info { - size_info.write_op_size_info.as_ref() - } else { - &EMPTY_VEC - }; + let empty_vec = Vec::new(); + let write_set_size_info = txn_size_info + .as_ref() + .map_or(&empty_vec, |size_info| &size_info.write_op_size_info); match txn_data { TxnData::User(user_txn) => { From 43346955eeb73dbec0059ad21f5e5c2daeaf9e3f Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Thu, 11 Jul 2024 18:32:58 -0700 Subject: [PATCH 3/3] lint --- .../src/db/common/models/default_models/parquet_transactions.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs index 0b7ff2be2..8e44c9dbc 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs @@ -26,7 +26,6 @@ use field_count::FieldCount; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; - #[derive( Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter, )]