From 05aa06d2b8b12f3c72851024876cf71d11ddc25b Mon Sep 17 00:00:00 2001 From: Daniel Porteous Date: Fri, 21 Jun 2024 11:53:25 +0200 Subject: [PATCH] [Data Service] Implement simple upstream transaction filtering There are two types of transaction filtering we will support in the future: 1. Per stream configuration: The downstream declares what txns they want to receive. 2. Global configuration: At the data service level we refuse to include full txns for all streams. This PR implements the second of these, using @CapCap's work here: https://github.com/aptos-labs/aptos-indexer-processors/pull/398. Rather than not sending txns at all if they match the blocklist filters, we just omit the writesets and events. Not sending the txns entirely would cause issues with processors, which today assume that they will receive all txns. --- Cargo.lock | 13 +- .../indexer-grpc-data-service/Cargo.toml | 1 + .../indexer-grpc-data-service/src/config.rs | 32 +++-- .../indexer-grpc-data-service/src/metrics.rs | 52 ++++++- .../indexer-grpc-data-service/src/service.rs | 127 ++++++++++++------ 5 files changed, 162 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 481707efdfdd3..6f614830f71cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2013,6 +2013,7 @@ dependencies = [ "aptos-metrics-core", "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processors.git?rev=4801acae7aea30d7e96bbfbe5ec5b04056dfa4cf)", "aptos-protos 1.3.0", + "aptos-transaction-filter", "async-trait", "clap 4.4.14", "futures", @@ -10189,9 +10190,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memmap2" @@ -15431,18 +15432,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml index c378e10000297..ae70ba89314fe 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml @@ -19,6 +19,7 @@ aptos-indexer-grpc-utils = { workspace = true } aptos-metrics-core = { workspace = true } aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } +aptos-transaction-filter = { workspace = true } async-trait = { workspace = true } clap = { workspace = true } futures = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs index 797154c86cf8b..c5f621fcde703 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs @@ -13,8 +13,9 @@ use aptos_protos::{ transaction::v1::FILE_DESCRIPTOR_SET as TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET, util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET, }; +use aptos_transaction_filter::BooleanTransactionFilter; use serde::{Deserialize, Serialize}; -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc}; use tonic::{codec::CompressionEncoding, transport::Server}; pub const SERVER_NAME: &str = "idxdatasvc"; @@ -69,9 +70,18 @@ pub struct IndexerGrpcDataServiceConfig { pub enable_cache_compression: bool, #[serde(default)] pub in_memory_cache_config: InMemoryCacheConfig, - /// Sender addresses to ignore. Transactions from these addresses will not be indexed. - #[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")] - pub sender_addresses_to_ignore: Vec, + /// Any transaction that matches this filter will be stripped. This means we remove + /// the payload, signature, events, and writesets from it before sending it + /// downstream. This should only be used in an emergency situation, e.g. when txns + /// related to a certain module are too large and are causing issues for the data + /// service. Learn more here: + /// + /// https://www.notion.so/aptoslabs/Runbook-c006a37259394ac2ba904d6b54d180fa?pvs=4#171c210964ec42a89574fc80154f9e85 + /// + /// Generally you will want to start with this with an OR, and then list out + /// separate filters that describe each type of txn we want to strip. + #[serde(default = "IndexerGrpcDataServiceConfig::default_txns_to_strip_filter")] + pub txns_to_strip_filter: BooleanTransactionFilter, } impl IndexerGrpcDataServiceConfig { @@ -84,7 +94,7 @@ impl IndexerGrpcDataServiceConfig { redis_read_replica_address: RedisUrl, enable_cache_compression: bool, in_memory_cache_config: InMemoryCacheConfig, - sender_addresses_to_ignore: Vec, + txns_to_strip_filter: BooleanTransactionFilter, ) -> Self { Self { data_service_grpc_tls_config, @@ -97,7 +107,7 @@ impl IndexerGrpcDataServiceConfig { redis_read_replica_address, enable_cache_compression, in_memory_cache_config, - sender_addresses_to_ignore, + txns_to_strip_filter, } } @@ -109,8 +119,9 @@ impl IndexerGrpcDataServiceConfig { false } - pub const fn default_sender_addresses_to_ignore() -> Vec { - vec![] + pub fn default_txns_to_strip_filter() -> BooleanTransactionFilter { + // This filter matches no txns. + BooleanTransactionFilter::new_or(vec![]) } } @@ -170,10 +181,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { self.redis_read_replica_address.clone(), self.file_store_config.clone(), self.data_service_response_channel_size, - self.sender_addresses_to_ignore - .clone() - .into_iter() - .collect::>(), + self.txns_to_strip_filter.clone(), cache_storage_format, Arc::new(in_memory_cache), )?; diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs index 908b42ed9a96d..4813efda1aed9 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs @@ -100,12 +100,58 @@ pub static SHORT_CONNECTION_COUNT: Lazy = Lazy::new(|| { .unwrap() }); -/// Count of bytes transfered to the client. This only represents the bytes prepared and ready -/// to send to the client. It does not represent the bytes actually sent to the client. +/// Count of bytes transfered to the client. This only represents the bytes prepared and +/// ready to send to the client. This only t It does not represent the bytes actually +/// sent to the client. +/// +/// This is pre stripping, so it may include bytes for transactions that were later +/// stripped. See BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING for post +/// stirpping. pub static BYTES_READY_TO_TRANSFER_FROM_SERVER: Lazy = Lazy::new(|| { register_int_counter_vec!( "indexer_grpc_data_service_bytes_ready_to_transfer_from_server", - "Count of bytes ready to transfer to the client", + "Count of bytes ready to transfer to the client (pre stripping)", + &[ + "identifier_type", + "identifier", + "email", + "application_name", + "processor" + ], + ) + .unwrap() +}); + +/// Count of bytes transfered to the client. This only represents the bytes prepared and +/// ready to send to the client. This only t It does not represent the bytes actually +/// sent to the client. +/// +/// This is post stripping, meaning some transactions may have been stripped (removing +/// things such as events, writesets, payload, signature). Compare this with +/// BYTES_READY_TO_TRANSFER_FROM_SERVER to see how many bytes were stripped. +pub static BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING: Lazy = + Lazy::new(|| { + register_int_counter_vec!( + "indexer_grpc_data_service_bytes_ready_to_transfer_from_server_after_stripping", + "Count of bytes ready to transfer to the client (post stripping)", + &[ + "identifier_type", + "identifier", + "email", + "application_name", + "processor" + ], + ) + .unwrap() + }); + +/// The number of transactions that had data (such as events, writesets, payload, +/// signature) stripped from them due to the `txns_to_strip_filter`. See +/// `strip_transactions` for more. +pub static NUM_TRANSACTIONS_STRIPPED: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "indexer_grpc_data_service_num_transactions_stripped", + "Number of transactions that had data (such as events, writesets, payload, signature) stripped from them", &[ "identifier_type", "identifier", diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs index 60ee8045d36dd..752c642f59f72 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs @@ -2,8 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use crate::metrics::{ - BYTES_READY_TO_TRANSFER_FROM_SERVER, CONNECTION_COUNT, ERROR_COUNT, - LATEST_PROCESSED_VERSION_PER_PROCESSOR, PROCESSED_LATENCY_IN_SECS_PER_PROCESSOR, + BYTES_READY_TO_TRANSFER_FROM_SERVER, BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING, + CONNECTION_COUNT, ERROR_COUNT, LATEST_PROCESSED_VERSION_PER_PROCESSOR, + NUM_TRANSACTIONS_STRIPPED, PROCESSED_LATENCY_IN_SECS_PER_PROCESSOR, PROCESSED_VERSIONS_COUNT_PER_PROCESSOR, SHORT_CONNECTION_COUNT, }; use anyhow::{Context, Result}; @@ -28,11 +29,12 @@ use aptos_protos::{ indexer::v1::{raw_data_server::RawData, GetTransactionsRequest, TransactionsResponse}, transaction::v1::{transaction::TxnData, Transaction}, }; +use aptos_transaction_filter::{BooleanTransactionFilter, Filterable}; use futures::Stream; use prost::Message; use redis::Client; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, pin::Pin, str::FromStr, sync::Arc, @@ -77,7 +79,7 @@ pub struct RawDataServerWrapper { pub redis_client: Arc, pub file_store_config: IndexerGrpcFileStoreConfig, pub data_service_response_channel_size: usize, - pub sender_addresses_to_ignore: HashSet, + pub txns_to_strip_filter: BooleanTransactionFilter, pub cache_storage_format: StorageFormat, in_memory_cache: Arc, } @@ -92,10 +94,7 @@ impl std::fmt::Debug for RawDataServerWrapper { "data_service_response_channel_size", &self.data_service_response_channel_size, ) - .field( - "sender_addresses_to_ignore", - &self.sender_addresses_to_ignore, - ) + .field("txns_to_strip_filter", &self.txns_to_strip_filter) .field("cache_storage_format", &self.cache_storage_format) .finish() } @@ -106,7 +105,7 @@ impl RawDataServerWrapper { redis_address: RedisUrl, file_store_config: IndexerGrpcFileStoreConfig, data_service_response_channel_size: usize, - sender_addresses_to_ignore: HashSet, + txns_to_strip_filter: BooleanTransactionFilter, cache_storage_format: StorageFormat, in_memory_cache: Arc, ) -> anyhow::Result { @@ -118,7 +117,7 @@ impl RawDataServerWrapper { ), file_store_config, data_service_response_channel_size, - sender_addresses_to_ignore, + txns_to_strip_filter, cache_storage_format, in_memory_cache, }) @@ -194,7 +193,7 @@ impl RawData for RawDataServerWrapper { let redis_client = self.redis_client.clone(); let cache_storage_format = self.cache_storage_format; let request_metadata = Arc::new(request_metadata); - let sender_addresses_to_ignore = self.sender_addresses_to_ignore.clone(); + let txns_to_strip_filter = self.txns_to_strip_filter.clone(); let in_memory_cache = self.in_memory_cache.clone(); tokio::spawn({ let request_metadata = request_metadata.clone(); @@ -206,7 +205,7 @@ impl RawData for RawDataServerWrapper { request_metadata, transactions_count, tx, - sender_addresses_to_ignore, + txns_to_strip_filter, current_version, in_memory_cache, ) @@ -394,7 +393,7 @@ async fn data_fetcher_task( request_metadata: Arc, transactions_count: Option, tx: tokio::sync::mpsc::Sender>, - sender_addresses_to_ignore: HashSet, + txns_to_strip_filter: BooleanTransactionFilter, mut current_version: u64, in_memory_cache: Arc, ) { @@ -529,11 +528,22 @@ async fn data_fetcher_task( // 2. Push the data to the response channel, i.e. stream the data to the client. let current_batch_size = transaction_data.as_slice().len(); let end_of_batch_version = transaction_data.as_slice().last().unwrap().version; - let resp_items = get_transactions_responses_builder( + let (resp_items, num_stripped) = get_transactions_responses_builder( transaction_data, chain_id as u32, - &sender_addresses_to_ignore, + &txns_to_strip_filter, ); + NUM_TRANSACTIONS_STRIPPED + .with_label_values(&request_metadata.get_label_values()) + .inc_by(num_stripped as u64); + let bytes_ready_to_transfer_after_stripping = resp_items + .iter() + .flat_map(|response| &response.transactions) + .map(|t| t.encoded_len()) + .sum::(); + BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING + .with_label_values(&request_metadata.get_label_values()) + .inc_by(bytes_ready_to_transfer_after_stripping as u64); let data_latency_in_secs = resp_items .last() .unwrap() @@ -548,7 +558,7 @@ async fn data_fetcher_task( .await { Ok(_) => { - // TODO: Reasses whether this metric useful + // TODO: Reasses whether this metric is useful. LATEST_PROCESSED_VERSION_PER_PROCESSOR .with_label_values(&request_metadata.get_label_values()) .set(end_of_batch_version as i64); @@ -672,22 +682,26 @@ fn ensure_sequential_transactions(mut batches: Vec>) -> Vec, chain_id: u32, - sender_addresses_to_ignore: &HashSet, -) -> Vec { - let filtered_transactions = - filter_transactions_for_sender_addresses(transactions, sender_addresses_to_ignore); - let chunks = chunk_transactions(filtered_transactions, MESSAGE_SIZE_LIMIT); - chunks + txns_to_strip_filter: &BooleanTransactionFilter, +) -> (Vec, usize) { + let (stripped_transactions, num_stripped) = + strip_transactions(transactions, txns_to_strip_filter); + let chunks = chunk_transactions(stripped_transactions, MESSAGE_SIZE_LIMIT); + let responses = chunks .into_iter() .map(|chunk| TransactionsResponse { chain_id: Some(chain_id as u64), transactions: chunk, }) - .collect() + .collect(); + (responses, num_stripped) } // This is a CPU bound operation, so we spawn_blocking @@ -956,37 +970,53 @@ async fn channel_send_multiple_with_timeout( Ok(()) } -fn filter_transactions_for_sender_addresses( +/// This function strips transactions that match the given filter. Stripping means we +/// remove the payload, signature, events, and writesets. Note, the filter can be +/// composed of many conditions, see `BooleanTransactionFilter` for more. +/// +/// This returns the mutated txns and the number of txns that were stripped. +fn strip_transactions( transactions: Vec, - sender_addresses_to_ignore: &HashSet, -) -> Vec { - transactions + txns_to_strip_filter: &BooleanTransactionFilter, +) -> (Vec, usize) { + let mut stripped_count = 0; + + let stripped_transactions: Vec = transactions .into_iter() .map(|mut txn| { - if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() { - if let Some(utr) = user_transaction.request.as_mut() { - if sender_addresses_to_ignore.contains(&utr.sender) { + // Note: `is_allowed` means the txn matches the filter, in which case + // we strip it. + if txns_to_strip_filter.is_allowed(&txn) { + stripped_count += 1; + if let Some(info) = txn.info.as_mut() { + info.changes = vec![]; + } + if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() { + user_transaction.events = vec![]; + if let Some(utr) = user_transaction.request.as_mut() { // Wipe the payload and signature. utr.payload = None; utr.signature = None; - user_transaction.events = vec![]; - txn.info.as_mut().unwrap().changes = vec![]; } } } txn }) - .collect() + .collect(); + + (stripped_transactions, stripped_count) } #[cfg(test)] mod tests { - use super::{ensure_sequential_transactions, filter_transactions_for_sender_addresses}; + use super::*; use aptos_protos::transaction::v1::{ transaction::TxnData, Event, Signature, Transaction, TransactionInfo, TransactionPayload, UserTransaction, UserTransactionRequest, WriteSetChange, }; - use std::collections::HashSet; + use aptos_transaction_filter::{ + boolean_transaction_filter::APIFilter, filters::UserTransactionFilterBuilder, + }; #[test] fn test_ensure_sequential_transactions_merges_and_sorts() { @@ -1034,7 +1064,7 @@ mod tests { } #[test] - fn test_transactions_are_filter_correctly() { + fn test_transactions_are_stripped_correctly_sender_addresses() { let sender_address = "0x1234".to_string(); // Create a transaction with a user transaction let txn = Transaction { @@ -1054,12 +1084,25 @@ mod tests { }), ..Default::default() }; - // create ignore list. - let ignore_hash_set: HashSet = vec![sender_address].into_iter().collect(); - let filtered_txn = filter_transactions_for_sender_addresses(vec![txn], &ignore_hash_set); - assert_eq!(filtered_txn.len(), 1); - let txn = filtered_txn.first().unwrap(); + // Create filter for senders to ignore. + let sender_filters = vec![sender_address] + .into_iter() + .map(|address| { + BooleanTransactionFilter::from(APIFilter::UserTransactionFilter( + UserTransactionFilterBuilder::default() + .sender(address) + .build() + .unwrap(), + )) + }) + .collect(); + let filter = BooleanTransactionFilter::new_or(sender_filters); + + let (filtered_txns, num_stripped) = strip_transactions(vec![txn], &filter); + assert_eq!(num_stripped, 1); + assert_eq!(filtered_txns.len(), 1); + let txn = filtered_txns.first().unwrap(); let user_transaction = match &txn.txn_data { Some(TxnData::User(user_transaction)) => user_transaction, _ => panic!("Expected user transaction"),