Skip to content

Commit

Permalink
[Data Service] Implement simple upstream transaction filtering
Browse files Browse the repository at this point in the history
There are two types of transaction filtering we will support in the future:
1. Per stream configuration: The downstream declares what txns they want to receive.
2. Global configuration: At the data service level we refuse to include full txns for all streams.

This PR implements the second of these, using @CapCap's work here:
aptos-labs/aptos-indexer-processors#398.

Rather than not sending txns at all if they match the blocklist filters, we just omit the writesets and events. Not sending the txns entirely would cause issues with processors, which today assume that they will receive all txns.
  • Loading branch information
banool committed Jun 19, 2024
1 parent c51f147 commit e10b908
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 47 deletions.
13 changes: 7 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ ark-groth16 = "0.4.0"
ark-serialize = "0.4.0"
ark-std = { version = "0.4.0", features = ["getrandom"] }
aptos-moving-average = { git = "https://github.com/aptos-labs/aptos-indexer-processors.git", rev = "4801acae7aea30d7e96bbfbe5ec5b04056dfa4cf" }
transaction-filter = { git = "https://github.com/aptos-labs/aptos-indexer-processors.git", rev = "35c362a10224ba0a22af8d03c515652c6bc62eb4" }
assert_approx_eq = "1.1.0"
assert_unordered = "0.3.5"
async-channel = "1.7.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aptos-metrics-core = { workspace = true }
aptos-moving-average = { workspace = true }
aptos-protos = { workspace = true }
async-trait = { workspace = true }
aptos-transaction-filter = { workspace = true }
clap = { workspace = true }
futures = { workspace = true }
once_cell = { workspace = true }
Expand Down
31 changes: 19 additions & 12 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use aptos_protos::{
transaction::v1::FILE_DESCRIPTOR_SET as TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET,
util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET,
};
use aptos_transaction_filter::BooleanTransactionFilter;
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc};
use tonic::{codec::CompressionEncoding, transport::Server};

pub const SERVER_NAME: &str = "idxdatasvc";
Expand Down Expand Up @@ -69,9 +70,17 @@ pub struct IndexerGrpcDataServiceConfig {
pub enable_cache_compression: bool,
#[serde(default)]
pub in_memory_cache_config: InMemoryCacheConfig,
/// Sender addresses to ignore. Transactions from these addresses will not be indexed.
#[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")]
pub sender_addresses_to_ignore: Vec<String>,
/// Any transaction that matches this filter will be stripped, meaning we remove
/// events and writesets from it before sending it downstream. This should only be
/// used in an emergency situation, e.g. when txns related to a certain module are
/// too large and are causing issues for the data service. Learn more here:
///
/// https://www.notion.so/aptoslabs/Runbook-c006a37259394ac2ba904d6b54d180fa?pvs=4#171c210964ec42a89574fc80154f9e85
///
/// Generally you will want to start with this with an OR, and then list out
/// separate filters that describe each type of txn we want to strip.
#[serde(default = "IndexerGrpcDataServiceConfig::default_txns_to_strip_filter")]
pub txns_to_strip_filter: BooleanTransactionFilter,
}

impl IndexerGrpcDataServiceConfig {
Expand All @@ -84,7 +93,7 @@ impl IndexerGrpcDataServiceConfig {
redis_read_replica_address: RedisUrl,
enable_cache_compression: bool,
in_memory_cache_config: InMemoryCacheConfig,
sender_addresses_to_ignore: Vec<String>,
txns_to_strip_filter: BooleanTransactionFilter,
) -> Self {
Self {
data_service_grpc_tls_config,
Expand All @@ -97,7 +106,7 @@ impl IndexerGrpcDataServiceConfig {
redis_read_replica_address,
enable_cache_compression,
in_memory_cache_config,
sender_addresses_to_ignore,
txns_to_strip_filter,
}
}

Expand All @@ -109,8 +118,9 @@ impl IndexerGrpcDataServiceConfig {
false
}

pub const fn default_sender_addresses_to_ignore() -> Vec<String> {
vec![]
pub fn default_txns_to_strip_filter() -> BooleanTransactionFilter {
// This filter matches no txns.
BooleanTransactionFilter::new_or(vec![])
}
}

Expand Down Expand Up @@ -170,10 +180,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
self.redis_read_replica_address.clone(),
self.file_store_config.clone(),
self.data_service_response_channel_size,
self.sender_addresses_to_ignore
.clone()
.into_iter()
.collect::<HashSet<_>>(),
self.txns_to_strip_filter.clone(),
cache_storage_format,
Arc::new(in_memory_cache),
)?;
Expand Down
76 changes: 47 additions & 29 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ use aptos_protos::{
indexer::v1::{raw_data_server::RawData, GetTransactionsRequest, TransactionsResponse},
transaction::v1::{transaction::TxnData, Transaction},
};
use aptos_transaction_filter::{BooleanTransactionFilter, Filterable};
use futures::Stream;
use prost::Message;
use redis::Client;
use std::{
collections::{HashMap, HashSet},
collections::HashMap,
pin::Pin,
str::FromStr,
sync::Arc,
Expand Down Expand Up @@ -77,7 +78,7 @@ pub struct RawDataServerWrapper {
pub redis_client: Arc<redis::Client>,
pub file_store_config: IndexerGrpcFileStoreConfig,
pub data_service_response_channel_size: usize,
pub sender_addresses_to_ignore: HashSet<String>,
pub txns_to_strip_filter: BooleanTransactionFilter,
pub cache_storage_format: StorageFormat,
in_memory_cache: Arc<InMemoryCache>,
}
Expand All @@ -92,10 +93,7 @@ impl std::fmt::Debug for RawDataServerWrapper {
"data_service_response_channel_size",
&self.data_service_response_channel_size,
)
.field(
"sender_addresses_to_ignore",
&self.sender_addresses_to_ignore,
)
.field("txns_to_strip_filter", &self.txns_to_strip_filter)
.field("cache_storage_format", &self.cache_storage_format)
.finish()
}
Expand All @@ -106,7 +104,7 @@ impl RawDataServerWrapper {
redis_address: RedisUrl,
file_store_config: IndexerGrpcFileStoreConfig,
data_service_response_channel_size: usize,
sender_addresses_to_ignore: HashSet<String>,
txns_to_strip_filter: BooleanTransactionFilter,
cache_storage_format: StorageFormat,
in_memory_cache: Arc<InMemoryCache>,
) -> anyhow::Result<Self> {
Expand All @@ -118,7 +116,7 @@ impl RawDataServerWrapper {
),
file_store_config,
data_service_response_channel_size,
sender_addresses_to_ignore,
txns_to_strip_filter,
cache_storage_format,
in_memory_cache,
})
Expand Down Expand Up @@ -194,7 +192,7 @@ impl RawData for RawDataServerWrapper {
let redis_client = self.redis_client.clone();
let cache_storage_format = self.cache_storage_format;
let request_metadata = Arc::new(request_metadata);
let sender_addresses_to_ignore = self.sender_addresses_to_ignore.clone();
let txns_to_strip_filter = self.txns_to_strip_filter.clone();
let in_memory_cache = self.in_memory_cache.clone();
tokio::spawn({
let request_metadata = request_metadata.clone();
Expand All @@ -206,7 +204,7 @@ impl RawData for RawDataServerWrapper {
request_metadata,
transactions_count,
tx,
sender_addresses_to_ignore,
txns_to_strip_filter,
current_version,
in_memory_cache,
)
Expand Down Expand Up @@ -394,7 +392,7 @@ async fn data_fetcher_task(
request_metadata: Arc<IndexerGrpcRequestMetadata>,
transactions_count: Option<u64>,
tx: tokio::sync::mpsc::Sender<Result<TransactionsResponse, Status>>,
sender_addresses_to_ignore: HashSet<String>,
txns_to_strip_filter: BooleanTransactionFilter,
mut current_version: u64,
in_memory_cache: Arc<InMemoryCache>,
) {
Expand Down Expand Up @@ -532,7 +530,7 @@ async fn data_fetcher_task(
let resp_items = get_transactions_responses_builder(
transaction_data,
chain_id as u32,
&sender_addresses_to_ignore,
&txns_to_strip_filter,
);
let data_latency_in_secs = resp_items
.last()
Expand Down Expand Up @@ -676,11 +674,10 @@ fn ensure_sequential_transactions(mut batches: Vec<Vec<Transaction>>) -> Vec<Tra
fn get_transactions_responses_builder(
transactions: Vec<Transaction>,
chain_id: u32,
sender_addresses_to_ignore: &HashSet<String>,
txns_to_strip_filter: &BooleanTransactionFilter,
) -> Vec<TransactionsResponse> {
let filtered_transactions =
filter_transactions_for_sender_addresses(transactions, sender_addresses_to_ignore);
let chunks = chunk_transactions(filtered_transactions, MESSAGE_SIZE_LIMIT);
let stripped_transactions = strip_transactions(transactions, txns_to_strip_filter);
let chunks = chunk_transactions(stripped_transactions, MESSAGE_SIZE_LIMIT);
chunks
.into_iter()
.map(|chunk| TransactionsResponse {
Expand Down Expand Up @@ -956,21 +953,28 @@ async fn channel_send_multiple_with_timeout(
Ok(())
}

fn filter_transactions_for_sender_addresses(
/// This function strips transactions that match the given filter. Stripping means we
/// remove the payload, signature, events, and writesets. Note, the filter can be
/// composed of many conditions, see `BooleanTransactionFilter` for more.
fn strip_transactions(
transactions: Vec<Transaction>,
sender_addresses_to_ignore: &HashSet<String>,
txns_to_strip_filter: &BooleanTransactionFilter,
) -> Vec<Transaction> {
transactions
.into_iter()
.map(|mut txn| {
if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() {
if let Some(utr) = user_transaction.request.as_mut() {
if sender_addresses_to_ignore.contains(&utr.sender) {
// Note: `is_allowed` means the txn is matches the filter, in which case
// we strip it.
if txns_to_strip_filter.is_allowed(&txn) {
if let Some(info) = txn.info.as_mut() {
info.changes = vec![];
}
if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() {
user_transaction.events = vec![];
if let Some(utr) = user_transaction.request.as_mut() {
// Wipe the payload and signature.
utr.payload = None;
utr.signature = None;
user_transaction.events = vec![];
txn.info.as_mut().unwrap().changes = vec![];
}
}
}
Expand All @@ -981,12 +985,14 @@ fn filter_transactions_for_sender_addresses(

#[cfg(test)]
mod tests {
use super::{ensure_sequential_transactions, filter_transactions_for_sender_addresses};
use super::*;
use aptos_protos::transaction::v1::{
transaction::TxnData, Event, Signature, Transaction, TransactionInfo, TransactionPayload,
UserTransaction, UserTransactionRequest, WriteSetChange,
};
use std::collections::HashSet;
use aptos_transaction_filter::{
boolean_transaction_filter::APIFilter, filters::UserTransactionFilterBuilder,
};

#[test]
fn test_ensure_sequential_transactions_merges_and_sorts() {
Expand Down Expand Up @@ -1034,7 +1040,7 @@ mod tests {
}

#[test]
fn test_transactions_are_filter_correctly() {
fn test_transactions_are_stripped_correctly_sender_addresses() {
let sender_address = "0x1234".to_string();
// Create a transaction with a user transaction
let txn = Transaction {
Expand All @@ -1054,10 +1060,22 @@ mod tests {
}),
..Default::default()
};
// create ignore list.
let ignore_hash_set: HashSet<String> = vec![sender_address].into_iter().collect();

let filtered_txn = filter_transactions_for_sender_addresses(vec![txn], &ignore_hash_set);
// Create filter for senders to ignore.
let sender_filters = vec![sender_address]
.into_iter()
.map(|address| {
BooleanTransactionFilter::from(APIFilter::UserTransactionFilter(
UserTransactionFilterBuilder::default()
.sender(address)
.build()
.unwrap(),
))
})
.collect();
let filter = BooleanTransactionFilter::new_or(sender_filters);

let filtered_txn = strip_transactions(vec![txn], &filter);
assert_eq!(filtered_txn.len(), 1);
let txn = filtered_txn.first().unwrap();
let user_transaction = match &txn.txn_data {
Expand Down

0 comments on commit e10b908

Please sign in to comment.