diff --git a/Cargo.lock b/Cargo.lock index a6633a44e1df2..b964bc696167e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2013,6 +2013,7 @@ dependencies = [ "aptos-metrics-core", "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processors.git?rev=4801acae7aea30d7e96bbfbe5ec5b04056dfa4cf)", "aptos-protos 1.3.0", + "aptos-transaction-filter", "async-trait", "clap 4.4.14", "futures", @@ -10188,9 +10189,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memmap2" @@ -15430,18 +15431,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml index c378e10000297..ae70ba89314fe 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml @@ -19,6 +19,7 @@ aptos-indexer-grpc-utils = { workspace = true } aptos-metrics-core = { workspace = true } aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } +aptos-transaction-filter = { workspace = true } async-trait = { workspace = true } clap = { workspace = true } futures = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs index 797154c86cf8b..c5f621fcde703 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs @@ -13,8 +13,9 @@ use aptos_protos::{ transaction::v1::FILE_DESCRIPTOR_SET as TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET, util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET, }; +use aptos_transaction_filter::BooleanTransactionFilter; use serde::{Deserialize, Serialize}; -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc}; use tonic::{codec::CompressionEncoding, transport::Server}; pub const SERVER_NAME: &str = "idxdatasvc"; @@ -69,9 +70,18 @@ pub struct IndexerGrpcDataServiceConfig { pub enable_cache_compression: bool, #[serde(default)] pub in_memory_cache_config: InMemoryCacheConfig, - /// Sender addresses to ignore. Transactions from these addresses will not be indexed. - #[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")] - pub sender_addresses_to_ignore: Vec, + /// Any transaction that matches this filter will be stripped. This means we remove + /// the payload, signature, events, and writesets from it before sending it + /// downstream. This should only be used in an emergency situation, e.g. when txns + /// related to a certain module are too large and are causing issues for the data + /// service. Learn more here: + /// + /// https://www.notion.so/aptoslabs/Runbook-c006a37259394ac2ba904d6b54d180fa?pvs=4#171c210964ec42a89574fc80154f9e85 + /// + /// Generally you will want to start with this with an OR, and then list out + /// separate filters that describe each type of txn we want to strip. + #[serde(default = "IndexerGrpcDataServiceConfig::default_txns_to_strip_filter")] + pub txns_to_strip_filter: BooleanTransactionFilter, } impl IndexerGrpcDataServiceConfig { @@ -84,7 +94,7 @@ impl IndexerGrpcDataServiceConfig { redis_read_replica_address: RedisUrl, enable_cache_compression: bool, in_memory_cache_config: InMemoryCacheConfig, - sender_addresses_to_ignore: Vec, + txns_to_strip_filter: BooleanTransactionFilter, ) -> Self { Self { data_service_grpc_tls_config, @@ -97,7 +107,7 @@ impl IndexerGrpcDataServiceConfig { redis_read_replica_address, enable_cache_compression, in_memory_cache_config, - sender_addresses_to_ignore, + txns_to_strip_filter, } } @@ -109,8 +119,9 @@ impl IndexerGrpcDataServiceConfig { false } - pub const fn default_sender_addresses_to_ignore() -> Vec { - vec![] + pub fn default_txns_to_strip_filter() -> BooleanTransactionFilter { + // This filter matches no txns. + BooleanTransactionFilter::new_or(vec![]) } } @@ -170,10 +181,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { self.redis_read_replica_address.clone(), self.file_store_config.clone(), self.data_service_response_channel_size, - self.sender_addresses_to_ignore - .clone() - .into_iter() - .collect::>(), + self.txns_to_strip_filter.clone(), cache_storage_format, Arc::new(in_memory_cache), )?; diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs index 60ee8045d36dd..c726d86259d33 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs @@ -28,11 +28,12 @@ use aptos_protos::{ indexer::v1::{raw_data_server::RawData, GetTransactionsRequest, TransactionsResponse}, transaction::v1::{transaction::TxnData, Transaction}, }; +use aptos_transaction_filter::{BooleanTransactionFilter, Filterable}; use futures::Stream; use prost::Message; use redis::Client; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, pin::Pin, str::FromStr, sync::Arc, @@ -77,7 +78,7 @@ pub struct RawDataServerWrapper { pub redis_client: Arc, pub file_store_config: IndexerGrpcFileStoreConfig, pub data_service_response_channel_size: usize, - pub sender_addresses_to_ignore: HashSet, + pub txns_to_strip_filter: BooleanTransactionFilter, pub cache_storage_format: StorageFormat, in_memory_cache: Arc, } @@ -92,10 +93,7 @@ impl std::fmt::Debug for RawDataServerWrapper { "data_service_response_channel_size", &self.data_service_response_channel_size, ) - .field( - "sender_addresses_to_ignore", - &self.sender_addresses_to_ignore, - ) + .field("txns_to_strip_filter", &self.txns_to_strip_filter) .field("cache_storage_format", &self.cache_storage_format) .finish() } @@ -106,7 +104,7 @@ impl RawDataServerWrapper { redis_address: RedisUrl, file_store_config: IndexerGrpcFileStoreConfig, data_service_response_channel_size: usize, - sender_addresses_to_ignore: HashSet, + txns_to_strip_filter: BooleanTransactionFilter, cache_storage_format: StorageFormat, in_memory_cache: Arc, ) -> anyhow::Result { @@ -118,7 +116,7 @@ impl RawDataServerWrapper { ), file_store_config, data_service_response_channel_size, - sender_addresses_to_ignore, + txns_to_strip_filter, cache_storage_format, in_memory_cache, }) @@ -194,7 +192,7 @@ impl RawData for RawDataServerWrapper { let redis_client = self.redis_client.clone(); let cache_storage_format = self.cache_storage_format; let request_metadata = Arc::new(request_metadata); - let sender_addresses_to_ignore = self.sender_addresses_to_ignore.clone(); + let txns_to_strip_filter = self.txns_to_strip_filter.clone(); let in_memory_cache = self.in_memory_cache.clone(); tokio::spawn({ let request_metadata = request_metadata.clone(); @@ -206,7 +204,7 @@ impl RawData for RawDataServerWrapper { request_metadata, transactions_count, tx, - sender_addresses_to_ignore, + txns_to_strip_filter, current_version, in_memory_cache, ) @@ -394,7 +392,7 @@ async fn data_fetcher_task( request_metadata: Arc, transactions_count: Option, tx: tokio::sync::mpsc::Sender>, - sender_addresses_to_ignore: HashSet, + txns_to_strip_filter: BooleanTransactionFilter, mut current_version: u64, in_memory_cache: Arc, ) { @@ -532,7 +530,7 @@ async fn data_fetcher_task( let resp_items = get_transactions_responses_builder( transaction_data, chain_id as u32, - &sender_addresses_to_ignore, + &txns_to_strip_filter, ); let data_latency_in_secs = resp_items .last() @@ -676,11 +674,10 @@ fn ensure_sequential_transactions(mut batches: Vec>) -> Vec, chain_id: u32, - sender_addresses_to_ignore: &HashSet, + txns_to_strip_filter: &BooleanTransactionFilter, ) -> Vec { - let filtered_transactions = - filter_transactions_for_sender_addresses(transactions, sender_addresses_to_ignore); - let chunks = chunk_transactions(filtered_transactions, MESSAGE_SIZE_LIMIT); + let stripped_transactions = strip_transactions(transactions, txns_to_strip_filter); + let chunks = chunk_transactions(stripped_transactions, MESSAGE_SIZE_LIMIT); chunks .into_iter() .map(|chunk| TransactionsResponse { @@ -956,21 +953,28 @@ async fn channel_send_multiple_with_timeout( Ok(()) } -fn filter_transactions_for_sender_addresses( +/// This function strips transactions that match the given filter. Stripping means we +/// remove the payload, signature, events, and writesets. Note, the filter can be +/// composed of many conditions, see `BooleanTransactionFilter` for more. +fn strip_transactions( transactions: Vec, - sender_addresses_to_ignore: &HashSet, + txns_to_strip_filter: &BooleanTransactionFilter, ) -> Vec { transactions .into_iter() .map(|mut txn| { - if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() { - if let Some(utr) = user_transaction.request.as_mut() { - if sender_addresses_to_ignore.contains(&utr.sender) { + // Note: `is_allowed` means the txn is matches the filter, in which case + // we strip it. + if txns_to_strip_filter.is_allowed(&txn) { + if let Some(info) = txn.info.as_mut() { + info.changes = vec![]; + } + if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() { + user_transaction.events = vec![]; + if let Some(utr) = user_transaction.request.as_mut() { // Wipe the payload and signature. utr.payload = None; utr.signature = None; - user_transaction.events = vec![]; - txn.info.as_mut().unwrap().changes = vec![]; } } } @@ -981,12 +985,14 @@ fn filter_transactions_for_sender_addresses( #[cfg(test)] mod tests { - use super::{ensure_sequential_transactions, filter_transactions_for_sender_addresses}; + use super::*; use aptos_protos::transaction::v1::{ transaction::TxnData, Event, Signature, Transaction, TransactionInfo, TransactionPayload, UserTransaction, UserTransactionRequest, WriteSetChange, }; - use std::collections::HashSet; + use aptos_transaction_filter::{ + boolean_transaction_filter::APIFilter, filters::UserTransactionFilterBuilder, + }; #[test] fn test_ensure_sequential_transactions_merges_and_sorts() { @@ -1034,7 +1040,7 @@ mod tests { } #[test] - fn test_transactions_are_filter_correctly() { + fn test_transactions_are_stripped_correctly_sender_addresses() { let sender_address = "0x1234".to_string(); // Create a transaction with a user transaction let txn = Transaction { @@ -1054,10 +1060,22 @@ mod tests { }), ..Default::default() }; - // create ignore list. - let ignore_hash_set: HashSet = vec![sender_address].into_iter().collect(); - let filtered_txn = filter_transactions_for_sender_addresses(vec![txn], &ignore_hash_set); + // Create filter for senders to ignore. + let sender_filters = vec![sender_address] + .into_iter() + .map(|address| { + BooleanTransactionFilter::from(APIFilter::UserTransactionFilter( + UserTransactionFilterBuilder::default() + .sender(address) + .build() + .unwrap(), + )) + }) + .collect(); + let filter = BooleanTransactionFilter::new_or(sender_filters); + + let filtered_txn = strip_transactions(vec![txn], &filter); assert_eq!(filtered_txn.len(), 1); let txn = filtered_txn.first().unwrap(); let user_transaction = match &txn.txn_data {