diff --git a/rust/processor/src/db/common/models/default_models/move_tables.rs b/rust/processor/src/db/common/models/default_models/move_tables.rs index e02a713c6..fe30a72da 100644 --- a/rust/processor/src/db/common/models/default_models/move_tables.rs +++ b/rust/processor/src/db/common/models/default_models/move_tables.rs @@ -47,6 +47,113 @@ pub struct TableMetadata { pub value_type: String, } +pub trait TableItemConvertible { + fn from_raw(raw_item: &RawTableItem) -> Self; +} + +pub struct RawTableItem { + pub txn_version: i64, + pub block_timestamp: chrono::NaiveDateTime, + pub write_set_change_index: i64, + pub transaction_block_height: i64, + pub table_key: String, + pub table_handle: String, + pub decoded_key: String, + pub decoded_value: Option, + pub is_deleted: bool, +} + +impl RawTableItem { + pub fn from_write_table_item( + write_table_item: &WriteTableItem, + write_set_change_index: i64, + txn_version: i64, + transaction_block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> (Self, CurrentTableItem) { + ( + Self { + txn_version, + write_set_change_index, + transaction_block_height, + table_key: write_table_item.key.to_string(), + table_handle: standardize_address(&write_table_item.handle.to_string()), + decoded_key: write_table_item.data.as_ref().unwrap().key.clone(), + decoded_value: Some(write_table_item.data.as_ref().unwrap().value.clone()), + is_deleted: false, + block_timestamp, + }, + CurrentTableItem { + table_handle: standardize_address(&write_table_item.handle.to_string()), + key_hash: hash_str(&write_table_item.key.to_string()), + key: write_table_item.key.to_string(), + decoded_key: serde_json::from_str( + write_table_item.data.as_ref().unwrap().key.as_str(), + ) + .unwrap(), + decoded_value: serde_json::from_str( + write_table_item.data.as_ref().unwrap().value.as_str(), + ) + .unwrap(), + last_transaction_version: txn_version, + is_deleted: false, + }, + ) + } + + pub fn from_delete_table_item( + delete_table_item: &DeleteTableItem, + write_set_change_index: i64, + txn_version: i64, + transaction_block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> (Self, CurrentTableItem) { + ( + Self { + txn_version, + write_set_change_index, + transaction_block_height, + table_key: delete_table_item.key.to_string(), + table_handle: standardize_address(&delete_table_item.handle.to_string()), + decoded_key: delete_table_item.data.as_ref().unwrap().key.clone(), + decoded_value: None, + is_deleted: true, + block_timestamp, + }, + CurrentTableItem { + table_handle: standardize_address(&delete_table_item.handle.to_string()), + key_hash: hash_str(&delete_table_item.key.to_string()), + key: delete_table_item.key.to_string(), + decoded_key: serde_json::from_str( + delete_table_item.data.as_ref().unwrap().key.as_str(), + ) + .unwrap(), + decoded_value: None, + last_transaction_version: txn_version, + is_deleted: true, + }, + ) + } +} + +impl TableItemConvertible for TableItem { + fn from_raw(raw_item: &RawTableItem) -> Self { + TableItem { + transaction_version: raw_item.txn_version, + write_set_change_index: raw_item.write_set_change_index, + transaction_block_height: raw_item.transaction_block_height, + key: raw_item.table_key.clone(), + table_handle: raw_item.table_handle.clone(), + decoded_key: serde_json::from_str(raw_item.decoded_key.as_str()).unwrap(), + decoded_value: raw_item + .decoded_value + .clone() + .map(|v| serde_json::from_str(v.as_str()).unwrap()), + is_deleted: raw_item.is_deleted, + } + } +} + impl TableItem { pub fn from_write_table_item( write_table_item: &WriteTableItem, diff --git a/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs b/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs index 1dc8da47c..e196362fd 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs @@ -5,6 +5,7 @@ use crate::{ bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::default_models::move_tables::{RawTableItem, TableItemConvertible}, utils::util::{hash_str, standardize_address}, }; use allocative_derive::Allocative; @@ -164,3 +165,19 @@ impl TableMetadata { } } } + +impl TableItemConvertible for TableItem { + fn from_raw(raw_item: &RawTableItem) -> Self { + TableItem { + txn_version: raw_item.txn_version, + write_set_change_index: raw_item.write_set_change_index, + transaction_block_height: raw_item.transaction_block_height, + table_key: raw_item.table_key.clone(), + table_handle: raw_item.table_handle.clone(), + decoded_key: raw_item.decoded_key.clone(), + decoded_value: raw_item.decoded_value.clone(), + is_deleted: raw_item.is_deleted, + block_timestamp: raw_item.block_timestamp, + } + } +} diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index 86da47c4c..b849b6dd3 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -5,7 +5,9 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::default_models::{ block_metadata_transactions::BlockMetadataTransactionModel, - move_tables::{CurrentTableItem, TableItem, TableMetadata}, + move_tables::{ + CurrentTableItem, RawTableItem, TableItem, TableItemConvertible, TableMetadata, + }, }, gap_detectors::ProcessingResult, schema, @@ -216,10 +218,14 @@ impl ProcessorTrait for DefaultProcessor { let processing_start = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let flags = self.deprecated_tables; - let (block_metadata_transactions, table_items, current_table_items, table_metadata) = + let (block_metadata_transactions, raw_table_items, current_table_items, table_metadata) = tokio::task::spawn_blocking(move || process_transactions(transactions, flags)) .await .expect("Failed to spawn_blocking for TransactionModel::from_transactions"); + + let postgres_table_items: Vec = + raw_table_items.iter().map(TableItem::from_raw).collect(); + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -229,7 +235,7 @@ impl ProcessorTrait for DefaultProcessor { start_version, end_version, &block_metadata_transactions, - (&table_items, ¤t_table_items, &table_metadata), + (&postgres_table_items, ¤t_table_items, &table_metadata), &self.per_table_chunk_sizes, ) .await; @@ -238,7 +244,7 @@ impl ProcessorTrait for DefaultProcessor { // make it faster. tokio::task::spawn(async move { drop(block_metadata_transactions); - drop(table_items); + drop(postgres_table_items); drop(current_table_items); drop(table_metadata); }); @@ -272,6 +278,7 @@ impl ProcessorTrait for DefaultProcessor { } } +// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet) /// Processes a list of transactions and extracts relevant data into different models. /// /// This function iterates over a list of transactions, extracting block metadata transactions, @@ -297,7 +304,7 @@ pub fn process_transactions( flags: TableFlags, ) -> ( Vec, - Vec, + Vec, Vec, Vec, ) { @@ -318,6 +325,10 @@ pub fn process_transactions( .info .as_ref() .expect("Transaction info doesn't exist!"); + + #[allow(deprecated)] + let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0) + .expect("Txn Timestamp is invalid!"); let txn_data = match transaction.txn_data.as_ref() { Some(txn_data) => txn_data, None => { @@ -349,11 +360,12 @@ pub fn process_transactions( .expect("WriteSetChange must have a change") { WriteSetChangeEnum::WriteTableItem(inner) => { - let (ti, cti) = TableItem::from_write_table_item( + let (ti, cti) = RawTableItem::from_write_table_item( inner, index as i64, version, block_height, + block_timestamp, ); table_items.push(ti); current_table_items.insert( @@ -366,11 +378,12 @@ pub fn process_transactions( ); }, WriteSetChangeEnum::DeleteTableItem(inner) => { - let (ti, cti) = TableItem::from_delete_table_item( + let (ti, cti) = RawTableItem::from_delete_table_item( inner, index as i64, version, block_height, + block_timestamp, ); table_items.push(ti); current_table_items @@ -391,9 +404,11 @@ pub fn process_transactions( .sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash))); table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); + // TODO: remove this, since we are not going to deprecate this anytime soon? if flags.contains(TableFlags::TABLE_ITEMS) { table_items.clear(); } + // TODO: migrate to Parquet if flags.contains(TableFlags::TABLE_METADATAS) { table_metadata.clear(); } diff --git a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs index 17495022b..f00d53fb9 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs @@ -155,7 +155,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { let ( (move_resources, write_set_changes, transactions, table_items, move_modules), transaction_version_to_struct_count, - ) = tokio::task::spawn_blocking(move || process_transactions(transactions)) + ) = tokio::task::spawn_blocking(move || process_transactions_parquet(transactions)) .await .expect("Failed to spawn_blocking for TransactionModel::from_transactions"); @@ -213,7 +213,8 @@ impl ProcessorTrait for ParquetDefaultProcessor { } } -pub fn process_transactions( +// TODO: Remove transaction_version_to_struct_count after migration +pub fn process_transactions_parquet( transactions: Vec, ) -> ( ( diff --git a/rust/sdk-processor/src/steps/common/mod.rs b/rust/sdk-processor/src/steps/common/mod.rs index 0e291f4b2..6b2e05793 100644 --- a/rust/sdk-processor/src/steps/common/mod.rs +++ b/rust/sdk-processor/src/steps/common/mod.rs @@ -1,4 +1,3 @@ -pub mod parquet_extractor_helper; pub mod processor_status_saver; pub use processor_status_saver::get_processor_status_saver; diff --git a/rust/sdk-processor/src/steps/default_processor/default_extractor.rs b/rust/sdk-processor/src/steps/default_processor/default_extractor.rs index e7428c839..ed317f87f 100644 --- a/rust/sdk-processor/src/steps/default_processor/default_extractor.rs +++ b/rust/sdk-processor/src/steps/default_processor/default_extractor.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use processor::{ db::common::models::default_models::{ block_metadata_transactions::BlockMetadataTransactionModel, - move_tables::{CurrentTableItem, TableItem, TableMetadata}, + move_tables::{CurrentTableItem, TableItem, TableItemConvertible, TableMetadata}, }, processors::default_processor::process_transactions, worker::TableFlags, @@ -47,13 +47,15 @@ impl Processable for DefaultExtractor { ProcessorError, > { let flags = self.deprecated_table_flags; - let (block_metadata_transactions, table_items, current_table_items, table_metadata) = + let (block_metadata_transactions, raw_table_items, current_table_items, table_metadata) = process_transactions(transactions.data, flags); + let postgres_table_items: Vec = + raw_table_items.iter().map(TableItem::from_raw).collect(); Ok(Some(TransactionContext { data: ( block_metadata_transactions, - table_items, + postgres_table_items, current_table_items, table_metadata, ), diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 3dd4c9776..f41861537 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -5,7 +5,7 @@ pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; pub mod objects_processor; -mod parquet_default_processor; +pub mod parquet_default_processor; pub mod stake_processor; pub mod token_v2_processor; pub mod user_transaction_processor; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs index 93dc7ac49..a709e7615 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs @@ -1,8 +1,7 @@ use crate::{ parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, - steps::common::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, + utils::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, }; -use ahash::AHashMap; use aptos_indexer_processor_sdk::{ aptos_protos::transaction::v1::Transaction, traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, @@ -10,12 +9,15 @@ use aptos_indexer_processor_sdk::{ utils::errors::ProcessorError, }; use async_trait::async_trait; -use processor::db::common::models::default_models::{ - parquet_move_modules::MoveModule, - parquet_move_resources::MoveResource, - parquet_move_tables::TableItem, - parquet_transactions::TransactionModel, - parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, +use processor::{ + db::common::models::default_models::{ + move_tables::TableItemConvertible, parquet_move_tables::TableItem, + }, + processors::{ + default_processor::process_transactions, + parquet_processors::parquet_default_processor::process_transactions_parquet, + }, + worker::TableFlags, }; use std::collections::HashMap; use tracing::debug; @@ -25,7 +27,7 @@ pub struct ParquetDefaultExtractor where Self: Processable + Send + Sized + 'static, { - pub opt_in_tables: Option>, + pub opt_in_tables: TableFlags, } type ParquetTypeMap = HashMap; @@ -40,46 +42,72 @@ impl Processable for ParquetDefaultExtractor { &mut self, transactions: TransactionContext, ) -> anyhow::Result>, ProcessorError> { - let (move_resources, write_set_changes, parquet_transactions, table_items, move_modules) = - process_transactions(transactions.data); + // TODO: remove struct_count map after migration + + let flags = TableFlags::empty(); + let (_, raw_table_items, _, _) = process_transactions(transactions.data.clone(), flags); + + let parquet_table_items: Vec = + raw_table_items.iter().map(TableItem::from_raw).collect(); + let ( + ( + move_resources, + write_set_changes, + parquet_transactions, + _parquet_table_items, + move_modules, + ), + _transaction_version_to_struct_count, + ) = process_transactions_parquet(transactions.data); // Print the size of each extracted data type - debug!("Processed data sizes:"); - debug!(" - MoveResources: {}", move_resources.len()); - debug!(" - WriteSetChanges: {}", write_set_changes.len()); - debug!(" - ParquetTransactions: {}", parquet_transactions.len()); - debug!(" - TableItems: {}", table_items.len()); - debug!(" - MoveModules: {}", move_modules.len()); + println!("Processed data sizes:"); + println!(" - MoveResources: {}", move_resources.len()); + println!(" - WriteSetChanges: {}", write_set_changes.len()); + println!(" - ParquetTransactions: {}", parquet_transactions.len()); + println!(" - TableItems: {}", parquet_table_items.len()); + println!(" - MoveModules: {}", move_modules.len()); let mut map: HashMap = HashMap::new(); - // Array of tuples for each data type and its corresponding enum variant + // Array of tuples for each data type and its corresponding enum variant and flag let data_types = [ ( + TableFlags::MOVE_RESOURCES, ParquetTypeEnum::MoveResource, ParquetTypeStructs::MoveResource(move_resources), ), ( + TableFlags::WRITE_SET_CHANGES, ParquetTypeEnum::WriteSetChange, ParquetTypeStructs::WriteSetChange(write_set_changes), ), ( + TableFlags::TRANSACTIONS, ParquetTypeEnum::Transaction, ParquetTypeStructs::Transaction(parquet_transactions), ), ( + TableFlags::TABLE_ITEMS, ParquetTypeEnum::TableItem, - ParquetTypeStructs::TableItem(table_items), + ParquetTypeStructs::TableItem(parquet_table_items), ), ( + TableFlags::MOVE_MODULES, ParquetTypeEnum::MoveModule, ParquetTypeStructs::MoveModule(move_modules), ), ]; // Populate the map based on opt-in tables - for (enum_type, data) in data_types { - add_to_map_if_opted_in_for_backfill(&self.opt_in_tables, &mut map, enum_type, data); + for (table_flag, enum_type, data) in data_types { + add_to_map_if_opted_in_for_backfill( + self.opt_in_tables, + &mut map, + table_flag, + enum_type, + data, + ); } debug!( @@ -94,49 +122,6 @@ impl Processable for ParquetDefaultExtractor { } } -pub fn process_transactions( - transactions: Vec, -) -> ( - Vec, - Vec, - Vec, - Vec, - Vec, -) { - // this will be removed in the future. - let mut transaction_version_to_struct_count = AHashMap::new(); - let (txns, _, write_set_changes, wsc_details) = TransactionModel::from_transactions( - &transactions, - &mut transaction_version_to_struct_count, - ); - - let mut move_modules = vec![]; - let mut move_resources = vec![]; - let mut table_items = vec![]; - - for detail in wsc_details { - match detail { - WriteSetChangeDetail::Module(module) => { - move_modules.push(module); - }, - WriteSetChangeDetail::Resource(resource) => { - move_resources.push(resource); - }, - WriteSetChangeDetail::Table(item, _, _) => { - table_items.push(item); - }, - } - } - - ( - move_resources, - write_set_changes, - txns, - table_items, - move_modules, - ) -} - impl AsyncStep for ParquetDefaultExtractor {} impl NamedStep for ParquetDefaultExtractor { diff --git a/rust/sdk-processor/src/utils/mod.rs b/rust/sdk-processor/src/utils/mod.rs index b829042b8..252b6f0b2 100644 --- a/rust/sdk-processor/src/utils/mod.rs +++ b/rust/sdk-processor/src/utils/mod.rs @@ -1,3 +1,4 @@ pub mod chain_id; pub mod database; +pub mod parquet_extractor_helper; pub mod starting_version; diff --git a/rust/sdk-processor/src/steps/common/parquet_extractor_helper.rs b/rust/sdk-processor/src/utils/parquet_extractor_helper.rs similarity index 55% rename from rust/sdk-processor/src/steps/common/parquet_extractor_helper.rs rename to rust/sdk-processor/src/utils/parquet_extractor_helper.rs index cc7e31b41..7f6e95683 100644 --- a/rust/sdk-processor/src/steps/common/parquet_extractor_helper.rs +++ b/rust/sdk-processor/src/utils/parquet_extractor_helper.rs @@ -1,20 +1,17 @@ use crate::parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}; +use processor::worker::TableFlags; use std::collections::HashMap; /// Fill the map with data if the table is opted in for backfill-purpose pub fn add_to_map_if_opted_in_for_backfill( - opt_in_tables: &Option>, + opt_in_tables: TableFlags, map: &mut HashMap, + table_flag: TableFlags, enum_type: ParquetTypeEnum, data: ParquetTypeStructs, ) { - if let Some(ref backfill_table) = opt_in_tables { - let table_name = enum_type.to_string(); - if backfill_table.contains(&table_name) { - map.insert(enum_type, data); - } - } else { - // If there's no opt-in table, include all data + // Add to map if all tables are opted-in (empty) or if the specific flag is set + if opt_in_tables.is_empty() || opt_in_tables.contains(table_flag) { map.insert(enum_type, data); } }