diff --git a/rust/Cargo.lock b/rust/Cargo.lock index d77bc100c..a42d4a762 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -364,9 +364,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.3.3" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" [[package]] name = "block-buffer" @@ -622,7 +622,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62c6fcf842f17f8c78ecf7c81d75c5ce84436b41ee07e03f490fbb5f5a8731d8" dependencies = [ "bigdecimal", - "bitflags 2.3.3", + "bitflags 2.5.0", "byteorder", "chrono", "diesel_derives", @@ -1817,7 +1817,7 @@ version = "0.10.62" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8cde4d2d9200ad5909f8dac647e29482e07c3a35de8a13fce7c9c7747ad9f671" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.5.0", "cfg-if", "foreign-types 0.3.2", "libc", @@ -2071,6 +2071,7 @@ dependencies = [ "async-trait", "bcs", "bigdecimal", + "bitflags 2.5.0", "chrono", "clap", "diesel", @@ -2454,7 +2455,7 @@ version = "0.38.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.5.0", "errno", "libc", "linux-raw-sys 0.4.3", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 2f984cd89..ad99033bd 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -27,6 +27,7 @@ base64 = "0.13.0" bb8 = "0.8.1" bcs = { git = "https://github.com/aptos-labs/bcs.git", rev = "d31fab9d81748e2594be5cd5cdf845786a30562d" } bigdecimal = { version = "0.4.0", features = ["serde"] } +bitflags = "2.5.0" chrono = { version = "0.4.19", features = ["clock", "serde"] } clap = { version = "4.3.5", features = ["derive", "unstable-styles"] } # Do NOT enable the postgres feature here, it is conditionally enabled in a feature diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 84d2ee3f1..07c923433 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -20,6 +20,7 @@ aptos-protos = { workspace = true } async-trait = { workspace = true } bcs = { workspace = true } bigdecimal = { workspace = true } +bitflags = { workspace = true } chrono = { workspace = true } clap = { workspace = true } diesel = { workspace = true } diff --git a/rust/processor/README.md b/rust/processor/README.md index b7aa97869..d1b259747 100644 --- a/rust/processor/README.md +++ b/rust/processor/README.md @@ -36,6 +36,9 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc - "0x07" # Skip all transactions that aren't user transactions focus_user_transactions: false + deprecated_tables: [ + "MOVE_RESOURCES", + ] ``` #### Config Explanation diff --git a/rust/processor/src/config.rs b/rust/processor/src/config.rs index 5a9626b2a..9d2d69c73 100644 --- a/rust/processor/src/config.rs +++ b/rust/processor/src/config.rs @@ -9,7 +9,7 @@ use ahash::AHashMap; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use server_framework::RunnableConfig; -use std::time::Duration; +use std::{collections::HashSet, time::Duration}; use url::Url; pub const QUERY_DEFAULT_RETRIES: u32 = 5; @@ -49,6 +49,8 @@ pub struct IndexerGrpcProcessorConfig { #[serde(default)] pub transaction_filter: TransactionFilter, + // String vector for deprecated tables to skip db writes + pub deprecated_tables: HashSet, } impl IndexerGrpcProcessorConfig { @@ -95,6 +97,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig { self.enable_verbose_logging, self.transaction_filter.clone(), self.grpc_response_item_timeout_in_secs, + self.deprecated_tables.clone(), ) .await .context("Failed to build worker")?; diff --git a/rust/processor/src/db/common/models/default_models/transactions.rs b/rust/processor/src/db/common/models/default_models/transactions.rs index dade5448f..36f0d7a4b 100644 --- a/rust/processor/src/db/common/models/default_models/transactions.rs +++ b/rust/processor/src/db/common/models/default_models/transactions.rs @@ -186,13 +186,15 @@ impl Transaction { .timestamp .as_ref() .expect("Transaction timestamp doesn't exist!"); + + let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes( + &transaction_info.changes, + version, + block_height, + ); + match txn_data { TxnData::User(user_txn) => { - let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes( - &transaction_info.changes, - version, - block_height, - ); let payload = user_txn .request .as_ref() @@ -202,7 +204,6 @@ impl Transaction { .expect("Getting payload failed."); let payload_cleaned = get_clean_payload(payload, version); let payload_type = get_payload_type(payload); - ( Self::from_transaction_info_with_data( transaction_info, @@ -220,11 +221,6 @@ impl Transaction { ) }, TxnData::Genesis(genesis_txn) => { - let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes( - &transaction_info.changes, - version, - block_height, - ); let payload = genesis_txn.payload.as_ref().unwrap(); let payload_cleaned = get_clean_writeset(payload, version); // It's genesis so no big deal @@ -245,34 +241,27 @@ impl Transaction { wsc_detail, ) }, - TxnData::BlockMetadata(block_metadata_txn) => { - let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes( - &transaction_info.changes, + TxnData::BlockMetadata(block_metadata_txn) => ( + Self::from_transaction_info_with_data( + transaction_info, + None, + None, version, + transaction_type, + block_metadata_txn.events.len() as i64, block_height, - ); - ( - Self::from_transaction_info_with_data( - transaction_info, - None, - None, - version, - transaction_type, - block_metadata_txn.events.len() as i64, - block_height, - epoch, - ), - Some(BlockMetadataTransaction::from_transaction( - block_metadata_txn, - version, - block_height, - epoch, - timestamp, - )), - wsc, - wsc_detail, - ) - }, + epoch, + ), + Some(BlockMetadataTransaction::from_transaction( + block_metadata_txn, + version, + block_height, + epoch, + timestamp, + )), + wsc, + wsc_detail, + ), TxnData::StateCheckpoint(_) => ( Self::from_transaction_info_with_data( transaction_info, diff --git a/rust/processor/src/db/common/models/default_models/write_set_changes.rs b/rust/processor/src/db/common/models/default_models/write_set_changes.rs index 79d271fcb..c28a97c51 100644 --- a/rust/processor/src/db/common/models/default_models/write_set_changes.rs +++ b/rust/processor/src/db/common/models/default_models/write_set_changes.rs @@ -44,6 +44,7 @@ impl WriteSetChange { .change .as_ref() .expect("WriteSetChange must have a change"); + match change { WriteSetChangeEnum::WriteModule(inner) => ( Self { diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index 5d38f36bd..017d434ee 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -13,6 +13,7 @@ use crate::{ }, schema, utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, + worker::TableFlags, }; use ahash::AHashMap; use anyhow::bail; @@ -30,13 +31,19 @@ use tracing::error; pub struct DefaultProcessor { connection_pool: ArcDbPool, per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, } impl DefaultProcessor { - pub fn new(connection_pool: ArcDbPool, per_table_chunk_sizes: AHashMap) -> Self { + pub fn new( + connection_pool: ArcDbPool, + per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, + ) -> Self { Self { connection_pool, per_table_chunk_sizes, + deprecated_tables, } } } @@ -82,6 +89,7 @@ async fn insert_to_db( txns, get_config_table_chunk_size::("transactions", per_table_chunk_sizes), ); + let bmt_res = execute_in_chunks( conn.clone(), insert_block_metadata_transactions_query, @@ -91,6 +99,7 @@ async fn insert_to_db( per_table_chunk_sizes, ), ); + let wst_res = execute_in_chunks( conn.clone(), insert_write_set_changes_query, @@ -100,6 +109,7 @@ async fn insert_to_db( per_table_chunk_sizes, ), ); + let mm_res = execute_in_chunks( conn.clone(), insert_move_modules_query, @@ -313,15 +323,15 @@ impl ProcessorTrait for DefaultProcessor { ) -> anyhow::Result { let processing_start = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let flags = self.deprecated_tables; let ( txns, block_metadata_transactions, write_set_changes, (move_modules, move_resources, table_items, current_table_items, table_metadata), - ) = tokio::task::spawn_blocking(move || process_transactions(transactions)) + ) = tokio::task::spawn_blocking(move || process_transactions(transactions, flags)) .await .expect("Failed to spawn_blocking for TransactionModel::from_transactions"); - let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -373,6 +383,7 @@ impl ProcessorTrait for DefaultProcessor { fn process_transactions( transactions: Vec, + flags: TableFlags, ) -> ( Vec, Vec, @@ -385,7 +396,7 @@ fn process_transactions( Vec, ), ) { - let (txns, block_metadata_txns, write_set_changes, wsc_details) = + let (mut txns, block_metadata_txns, mut write_set_changes, wsc_details) = TransactionModel::from_transactions(&transactions); let mut block_metadata_transactions = vec![]; for block_metadata_txn in block_metadata_txns { @@ -426,6 +437,16 @@ fn process_transactions( .sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash))); table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); + if flags.contains(TableFlags::MOVE_RESOURCES) { + move_resources.clear(); + } + if flags.contains(TableFlags::TRANSACTIONS) { + txns.clear(); + } + if flags.contains(TableFlags::WRITE_SET_CHANGES) { + write_set_changes.clear(); + } + ( txns, block_metadata_transactions, diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 4289a9879..c1cf021ce 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -37,6 +37,8 @@ use crate::{ use ahash::AHashMap; use anyhow::{Context, Result}; use aptos_moving_average::MovingAverage; +use bitflags::bitflags; +use std::collections::HashSet; use tokio::task::JoinHandle; use tracing::{debug, error, info}; use url::Url; @@ -47,6 +49,15 @@ use url::Url; pub const BUFFER_SIZE: usize = 100; pub const PROCESSOR_SERVICE_TYPE: &str = "processor"; +bitflags! { + #[derive(Debug, Clone, Copy)] + pub struct TableFlags: u64 { + const TRANSACTIONS = 1; + const WRITE_SET_CHANGES = 2; + const MOVE_RESOURCES = 4; + } +} + pub struct Worker { pub db_pool: ArcDbPool, pub processor_config: ProcessorConfig, @@ -64,6 +75,7 @@ pub struct Worker { pub enable_verbose_logging: Option, pub transaction_filter: TransactionFilter, pub grpc_response_item_timeout_in_secs: u64, + pub deprecated_tables: TableFlags, } impl Worker { @@ -85,6 +97,7 @@ impl Worker { enable_verbose_logging: Option, transaction_filter: TransactionFilter, grpc_response_item_timeout_in_secs: u64, + deprecated_tables: HashSet, ) -> Result { let processor_name = processor_config.name(); info!(processor_name = processor_name, "[Parser] Kicking off"); @@ -103,6 +116,14 @@ impl Worker { "[Parser] Finish creating the connection pool" ); let number_concurrent_processing_tasks = number_concurrent_processing_tasks.unwrap_or(10); + + let mut deprecated_tables_flags = TableFlags::empty(); + for table in deprecated_tables.iter() { + if let Some(flags) = TableFlags::from_name(table) { + deprecated_tables_flags |= flags; + } + } + Ok(Self { db_pool: conn_pool, processor_config, @@ -120,6 +141,7 @@ impl Worker { enable_verbose_logging, transaction_filter, grpc_response_item_timeout_in_secs, + deprecated_tables: deprecated_tables_flags, }) } @@ -240,6 +262,7 @@ impl Worker { let processor = build_processor( &self.processor_config, self.per_table_chunk_sizes.clone(), + self.deprecated_tables, self.db_pool.clone(), ); tokio::spawn(async move { @@ -304,6 +327,7 @@ impl Worker { let processor = build_processor( &self.processor_config, self.per_table_chunk_sizes.clone(), + self.deprecated_tables, self.db_pool.clone(), ); @@ -720,6 +744,7 @@ pub async fn do_processor( pub fn build_processor( config: &ProcessorConfig, per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, db_pool: ArcDbPool, ) -> Processor { match config { @@ -734,9 +759,11 @@ pub fn build_processor( ProcessorConfig::CoinProcessor => { Processor::from(CoinProcessor::new(db_pool, per_table_chunk_sizes)) }, - ProcessorConfig::DefaultProcessor => { - Processor::from(DefaultProcessor::new(db_pool, per_table_chunk_sizes)) - }, + ProcessorConfig::DefaultProcessor => Processor::from(DefaultProcessor::new( + db_pool, + per_table_chunk_sizes, + deprecated_tables, + )), ProcessorConfig::EventsProcessor => { Processor::from(EventsProcessor::new(db_pool, per_table_chunk_sizes)) },