diff --git a/rust/processor/src/db/common/models/default_models/move_tables.rs b/rust/processor/src/db/common/models/default_models/move_tables.rs index e02a713c6..4571ff0ed 100644 --- a/rust/processor/src/db/common/models/default_models/move_tables.rs +++ b/rust/processor/src/db/common/models/default_models/move_tables.rs @@ -47,6 +47,111 @@ pub struct TableMetadata { pub value_type: String, } +pub trait TableItemConvertible { + fn from_raw(raw_item: &RawTableItem) -> Self; +} + +pub struct RawTableItem { + pub txn_version: i64, + pub block_timestamp: chrono::NaiveDateTime, + pub write_set_change_index: i64, + pub transaction_block_height: i64, + pub table_key: String, + pub table_handle: String, + pub decoded_key: String, + pub decoded_value: Option, + pub is_deleted: bool, +} + +impl RawTableItem { + pub fn from_write_table_item( + write_table_item: &WriteTableItem, + write_set_change_index: i64, + txn_version: i64, + transaction_block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> (Self, CurrentTableItem) { + ( + Self { + txn_version, + write_set_change_index, + transaction_block_height, + table_key: write_table_item.key.to_string(), + table_handle: standardize_address(&write_table_item.handle.to_string()), + decoded_key: write_table_item.data.as_ref().unwrap().key.clone(), + decoded_value: Some(write_table_item.data.as_ref().unwrap().value.clone()), + is_deleted: false, + block_timestamp, + }, + CurrentTableItem { + table_handle: standardize_address(&write_table_item.handle.to_string()), + key_hash: hash_str(&write_table_item.key.to_string()), + key: write_table_item.key.to_string(), + decoded_key: serde_json::from_str( + write_table_item.data.as_ref().unwrap().key.as_str(), + ) + .unwrap(), + decoded_value: serde_json::from_str( + write_table_item.data.as_ref().unwrap().value.as_str(), + ) + .unwrap(), + last_transaction_version: txn_version, + is_deleted: false, + }, + ) + } + + pub fn from_delete_table_item( + delete_table_item: &DeleteTableItem, + write_set_change_index: i64, + txn_version: i64, + transaction_block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> (Self, CurrentTableItem) { + ( + Self { + txn_version, + write_set_change_index, + transaction_block_height, + table_key: delete_table_item.key.to_string(), + table_handle: standardize_address(&delete_table_item.handle.to_string()), + decoded_key: delete_table_item.data.as_ref().unwrap().key.clone(), + decoded_value: None, + is_deleted: true, + block_timestamp, + }, + CurrentTableItem { + table_handle: standardize_address(&delete_table_item.handle.to_string()), + key_hash: hash_str(&delete_table_item.key.to_string()), + key: delete_table_item.key.to_string(), + decoded_key: serde_json::from_str( + delete_table_item.data.as_ref().unwrap().key.as_str(), + ) + .unwrap(), + decoded_value: None, + last_transaction_version: txn_version, + is_deleted: true, + }, + ) + } +} + +// In move_tables.rs +impl TableItemConvertible for TableItem { + fn from_raw(raw_item: &RawTableItem) -> Self { + TableItem { + transaction_version: raw_item.txn_version, + write_set_change_index: raw_item.write_set_change_index, + transaction_block_height: raw_item.transaction_block_height, + key: raw_item.table_key.clone(), + table_handle: raw_item.table_handle.clone(), + decoded_key: serde_json::from_str(raw_item.decoded_key.as_str()).unwrap(), + decoded_value: raw_item.decoded_value.clone().map(|v| serde_json::from_str(v.as_str()).unwrap()), + is_deleted: raw_item.is_deleted, + } + } +} + impl TableItem { pub fn from_write_table_item( write_table_item: &WriteTableItem, diff --git a/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs b/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs index 1dc8da47c..0373f0d1f 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs @@ -12,6 +12,7 @@ use aptos_protos::transaction::v1::{DeleteTableItem, WriteTableItem}; use field_count::FieldCount; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; +use crate::db::common::models::default_models::move_tables::{RawTableItem, TableItemConvertible}; #[derive( Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter, @@ -164,3 +165,21 @@ impl TableMetadata { } } } + + +// In parquet_move_tables.rs +impl TableItemConvertible for TableItem { + fn from_raw(raw_item: &RawTableItem) -> Self { + TableItem { + txn_version: raw_item.txn_version, + write_set_change_index: raw_item.write_set_change_index, + transaction_block_height: raw_item.transaction_block_height, + table_key: raw_item.table_key.clone(), + table_handle: raw_item.table_handle.clone(), + decoded_key: raw_item.decoded_key.clone(), + decoded_value: raw_item.decoded_value.clone(), + is_deleted: raw_item.is_deleted, + block_timestamp: raw_item.block_timestamp, + } + } +} \ No newline at end of file diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index 86da47c4c..260858ab3 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -29,6 +29,9 @@ use diesel::{ use std::fmt::Debug; use tokio::join; use tracing::error; +use crate::db::common::models::default_models::move_tables::RawTableItem; +use crate::db::common::models::default_models::move_tables::TableItemConvertible; + pub struct DefaultProcessor { connection_pool: ArcDbPool, @@ -216,10 +219,16 @@ impl ProcessorTrait for DefaultProcessor { let processing_start = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let flags = self.deprecated_tables; - let (block_metadata_transactions, table_items, current_table_items, table_metadata) = + let (block_metadata_transactions, raw_table_items, current_table_items, table_metadata) = tokio::task::spawn_blocking(move || process_transactions(transactions, flags)) .await .expect("Failed to spawn_blocking for TransactionModel::from_transactions"); + + let postgres_table_items: Vec = raw_table_items + .iter() + .map(TableItem::from_raw) + .collect(); + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -229,7 +238,7 @@ impl ProcessorTrait for DefaultProcessor { start_version, end_version, &block_metadata_transactions, - (&table_items, ¤t_table_items, &table_metadata), + (&postgres_table_items, ¤t_table_items, &table_metadata), &self.per_table_chunk_sizes, ) .await; @@ -238,7 +247,7 @@ impl ProcessorTrait for DefaultProcessor { // make it faster. tokio::task::spawn(async move { drop(block_metadata_transactions); - drop(table_items); + drop(postgres_table_items); drop(current_table_items); drop(table_metadata); }); @@ -297,7 +306,7 @@ pub fn process_transactions( flags: TableFlags, ) -> ( Vec, - Vec, + Vec, Vec, Vec, ) { @@ -318,6 +327,10 @@ pub fn process_transactions( .info .as_ref() .expect("Transaction info doesn't exist!"); + + #[allow(deprecated)] + let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0) + .expect("Txn Timestamp is invalid!"); let txn_data = match transaction.txn_data.as_ref() { Some(txn_data) => txn_data, None => { @@ -349,12 +362,19 @@ pub fn process_transactions( .expect("WriteSetChange must have a change") { WriteSetChangeEnum::WriteTableItem(inner) => { - let (ti, cti) = TableItem::from_write_table_item( + let (ti, cti) = RawTableItem::from_write_table_item( inner, index as i64, version, block_height, + block_timestamp, ); + // let (ti, cti) = TableItem::from_write_table_item( + // inner, + // index as i64, + // version, + // block_height, + // ); table_items.push(ti); current_table_items.insert( (cti.table_handle.clone(), cti.key_hash.clone()), @@ -366,11 +386,12 @@ pub fn process_transactions( ); }, WriteSetChangeEnum::DeleteTableItem(inner) => { - let (ti, cti) = TableItem::from_delete_table_item( + let (ti, cti) = RawTableItem::from_delete_table_item( inner, index as i64, version, block_height, + block_timestamp ); table_items.push(ti); current_table_items diff --git a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs index 17495022b..d8fe6ab38 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs @@ -155,7 +155,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { let ( (move_resources, write_set_changes, transactions, table_items, move_modules), transaction_version_to_struct_count, - ) = tokio::task::spawn_blocking(move || process_transactions(transactions)) + ) = tokio::task::spawn_blocking(move || process_transactions2(transactions)) .await .expect("Failed to spawn_blocking for TransactionModel::from_transactions"); @@ -213,7 +213,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { } } -pub fn process_transactions( +pub fn process_transactions2( transactions: Vec, ) -> ( ( diff --git a/rust/sdk-processor/src/steps/default_processor/default_extractor.rs b/rust/sdk-processor/src/steps/default_processor/default_extractor.rs index e7428c839..ed317f87f 100644 --- a/rust/sdk-processor/src/steps/default_processor/default_extractor.rs +++ b/rust/sdk-processor/src/steps/default_processor/default_extractor.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use processor::{ db::common::models::default_models::{ block_metadata_transactions::BlockMetadataTransactionModel, - move_tables::{CurrentTableItem, TableItem, TableMetadata}, + move_tables::{CurrentTableItem, TableItem, TableItemConvertible, TableMetadata}, }, processors::default_processor::process_transactions, worker::TableFlags, @@ -47,13 +47,15 @@ impl Processable for DefaultExtractor { ProcessorError, > { let flags = self.deprecated_table_flags; - let (block_metadata_transactions, table_items, current_table_items, table_metadata) = + let (block_metadata_transactions, raw_table_items, current_table_items, table_metadata) = process_transactions(transactions.data, flags); + let postgres_table_items: Vec = + raw_table_items.iter().map(TableItem::from_raw).collect(); Ok(Some(TransactionContext { data: ( block_metadata_transactions, - table_items, + postgres_table_items, current_table_items, table_metadata, ), diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 3dd4c9776..f41861537 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -5,7 +5,7 @@ pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; pub mod objects_processor; -mod parquet_default_processor; +pub mod parquet_default_processor; pub mod stake_processor; pub mod token_v2_processor; pub mod user_transaction_processor; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs index 93dc7ac49..d5d81085b 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs @@ -2,7 +2,6 @@ use crate::{ parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, steps::common::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, }; -use ahash::AHashMap; use aptos_indexer_processor_sdk::{ aptos_protos::transaction::v1::Transaction, traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, @@ -10,12 +9,15 @@ use aptos_indexer_processor_sdk::{ utils::errors::ProcessorError, }; use async_trait::async_trait; -use processor::db::common::models::default_models::{ - parquet_move_modules::MoveModule, - parquet_move_resources::MoveResource, - parquet_move_tables::TableItem, - parquet_transactions::TransactionModel, - parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, +use processor::{ + db::common::models::default_models::{ + move_tables::TableItemConvertible, parquet_move_tables::TableItem, + }, + processors::{ + default_processor::process_transactions, + parquet_processors::parquet_default_processor::process_transactions2, + }, + worker::TableFlags, }; use std::collections::HashMap; use tracing::debug; @@ -40,16 +42,32 @@ impl Processable for ParquetDefaultExtractor { &mut self, transactions: TransactionContext, ) -> anyhow::Result>, ProcessorError> { - let (move_resources, write_set_changes, parquet_transactions, table_items, move_modules) = - process_transactions(transactions.data); + // TODO: remove struct_count map after migration + + let flags = TableFlags::empty(); + + let (_, raw_table_items, _, _) = process_transactions(transactions.data.clone(), flags); + + let parquet_table_items: Vec = + raw_table_items.iter().map(TableItem::from_raw).collect(); + let ( + ( + move_resources, + write_set_changes, + parquet_transactions, + _parquet_table_items, + move_modules, + ), + _transaction_version_to_struct_count, + ) = process_transactions2(transactions.data); // Print the size of each extracted data type - debug!("Processed data sizes:"); - debug!(" - MoveResources: {}", move_resources.len()); - debug!(" - WriteSetChanges: {}", write_set_changes.len()); - debug!(" - ParquetTransactions: {}", parquet_transactions.len()); - debug!(" - TableItems: {}", table_items.len()); - debug!(" - MoveModules: {}", move_modules.len()); + println!("Processed data sizes:"); + println!(" - MoveResources: {}", move_resources.len()); + println!(" - WriteSetChanges: {}", write_set_changes.len()); + println!(" - ParquetTransactions: {}", parquet_transactions.len()); + println!(" - TableItems: {}", parquet_table_items.len()); + println!(" - MoveModules: {}", move_modules.len()); let mut map: HashMap = HashMap::new(); @@ -69,7 +87,7 @@ impl Processable for ParquetDefaultExtractor { ), ( ParquetTypeEnum::TableItem, - ParquetTypeStructs::TableItem(table_items), + ParquetTypeStructs::TableItem(parquet_table_items), ), ( ParquetTypeEnum::MoveModule, @@ -94,49 +112,6 @@ impl Processable for ParquetDefaultExtractor { } } -pub fn process_transactions( - transactions: Vec, -) -> ( - Vec, - Vec, - Vec, - Vec, - Vec, -) { - // this will be removed in the future. - let mut transaction_version_to_struct_count = AHashMap::new(); - let (txns, _, write_set_changes, wsc_details) = TransactionModel::from_transactions( - &transactions, - &mut transaction_version_to_struct_count, - ); - - let mut move_modules = vec![]; - let mut move_resources = vec![]; - let mut table_items = vec![]; - - for detail in wsc_details { - match detail { - WriteSetChangeDetail::Module(module) => { - move_modules.push(module); - }, - WriteSetChangeDetail::Resource(resource) => { - move_resources.push(resource); - }, - WriteSetChangeDetail::Table(item, _, _) => { - table_items.push(item); - }, - } - } - - ( - move_resources, - write_set_changes, - txns, - table_items, - move_modules, - ) -} - impl AsyncStep for ParquetDefaultExtractor {} impl NamedStep for ParquetDefaultExtractor {