From e4b4fd274b773a55728b2e625f70e4ee5c408789 Mon Sep 17 00:00:00 2001 From: CapCap Date: Thu, 30 May 2024 19:32:32 -0700 Subject: [PATCH 1/7] Simpler Transaction Filtering --- rust/Cargo.lock | 1 + rust/Cargo.toml | 2 + rust/processor/src/utils/util.rs | 5 +- rust/transaction-filter/Cargo.toml | 3 + .../transaction-filter/src/filter_operator.rs | 334 ++++++++++++++++++ .../src/filters/event_filter.rs | 48 +++ rust/transaction-filter/src/filters/mod.rs | 2 + .../src/filters/positional.rs | 47 +++ .../src/filters/user_transaction_request.rs | 13 +- .../src/filters/write_set_change_filter.rs | 32 +- rust/transaction-filter/src/json_search.rs | 290 +++++++++++++++ rust/transaction-filter/src/lib.rs | 13 +- 12 files changed, 777 insertions(+), 13 deletions(-) create mode 100644 rust/transaction-filter/src/filter_operator.rs create mode 100644 rust/transaction-filter/src/filters/event_filter.rs create mode 100644 rust/transaction-filter/src/filters/positional.rs create mode 100644 rust/transaction-filter/src/json_search.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index ce03320fe..57990266f 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -3624,6 +3624,7 @@ dependencies = [ "anyhow", "aptos-protos", "lz4", + "memchr", "prost 0.12.3", "serde", "serde_json", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index af64c08e7..aa7762b39 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -67,6 +67,8 @@ jemallocator = { version = "0.5.0", features = [ ] } kanal = { version = "0.1.0-pre8", features = ["async"] } once_cell = "1.10.0" +# SIMD for string search +memchr = "2.7.2" num_cpus = "1.16.0" pbjson = "0.5.1" prometheus = { version = "0.13.0", default-features = false } diff --git a/rust/processor/src/utils/util.rs b/rust/processor/src/utils/util.rs index 14f750322..9638da9be 100644 --- a/rust/processor/src/utils/util.rs +++ b/rust/processor/src/utils/util.rs @@ -53,8 +53,11 @@ pub struct MultisigPayloadClean { } /// Standardizes all addresses and table handles to be length 66 (0x-64 length hash) +#[inline] pub fn standardize_address(handle: &str) -> String { - if let Some(handle) = handle.strip_prefix("0x") { + if handle.len() == 66 { + handle.to_string() + } else if let Some(handle) = handle.strip_prefix("0x") { format!("0x{:0>64}", handle) } else { format!("0x{:0>64}", handle) diff --git a/rust/transaction-filter/Cargo.toml b/rust/transaction-filter/Cargo.toml index 7fe85ab00..2d68aa40e 100644 --- a/rust/transaction-filter/Cargo.toml +++ b/rust/transaction-filter/Cargo.toml @@ -16,6 +16,9 @@ rust-version = { workspace = true } anyhow = { workspace = true } aptos-protos = { workspace = true } +# SIMD for string search. TODO: benchmark this on various real inputs to see if it's worth it +memchr = { workspace = true } + prost = { workspace = true } serde = { workspace = true } diff --git a/rust/transaction-filter/src/filter_operator.rs b/rust/transaction-filter/src/filter_operator.rs new file mode 100644 index 000000000..265820be1 --- /dev/null +++ b/rust/transaction-filter/src/filter_operator.rs @@ -0,0 +1,334 @@ +// use crate::traits::Filterable; +use crate::{ + filters::{ + EventFilter, TransactionRootFilter, UserTransactionRequestFilter, WriteSetChangeFilter, + }, + traits::Filterable, +}; +use anyhow::Error; +use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +/// These are filters we would expect to be exposed via API +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +#[serde(tag = "type")] +pub enum APIFilter { + TransactionRootFilter(TransactionRootFilter), + UserTransactionRequestFilter(UserTransactionRequestFilter), + EventFilter(EventFilter), + WriteSetChangeFilter(WriteSetChangeFilter), +} + +impl Filterable for APIFilter { + fn is_valid(&self) -> Result<(), Error> { + match self { + APIFilter::TransactionRootFilter(filter) => filter.is_valid(), + APIFilter::UserTransactionRequestFilter(filter) => filter.is_valid(), + APIFilter::EventFilter(filter) => filter.is_valid(), + APIFilter::WriteSetChangeFilter(filter) => filter.is_valid(), + } + } + + fn is_allowed(&self, txn: &Transaction) -> bool { + match self { + APIFilter::TransactionRootFilter(filter) => filter.is_allowed(txn), + APIFilter::UserTransactionRequestFilter(ut_filter) => txn + .txn_data + .as_ref() + .map(|txn_data| { + if let TxnData::User(u) = txn_data { + u.request + .as_ref() + .map(|req| ut_filter.is_allowed(req)) + .unwrap_or(false) + } else { + false + } + }) + .unwrap_or(false), + APIFilter::EventFilter(events_filter) => { + if let Some(txn_data) = &txn.txn_data { + let events = match txn_data { + TxnData::BlockMetadata(bm) => &bm.events, + TxnData::Genesis(g) => &g.events, + TxnData::StateCheckpoint(_) => return false, + TxnData::User(u) => &u.events, + TxnData::Validator(_) => return false, + }; + events_filter.is_allowed_vec(events) + } else { + false + } + }, + APIFilter::WriteSetChangeFilter(changes_filter) => { + changes_filter.is_allowed_opt_vec(&txn.info.as_ref().map(|inf| &inf.changes)) + }, + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(untagged)] +pub enum FilterOperator { + And(LogicalAnd), + Or(LogicalOr), + Filter(APIFilter), +} + +impl FilterOperator { + pub fn new_and(and: Vec) -> Self { + FilterOperator::And(LogicalAnd { and }) + } + + pub fn new_or(or: Vec) -> Self { + FilterOperator::Or(LogicalOr { or }) + } + + pub fn new_filter(filter: APIFilter) -> Self { + FilterOperator::Filter(filter) + } +} + +impl Filterable for FilterOperator { + fn is_valid(&self) -> Result<(), Error> { + match self { + FilterOperator::And(and) => and.is_valid(), + FilterOperator::Or(or) => or.is_valid(), + FilterOperator::Filter(filter) => filter.is_valid(), + } + } + + fn is_allowed(&self, item: &Transaction) -> bool { + match self { + FilterOperator::And(and) => and.is_allowed(item), + FilterOperator::Or(or) => or.is_allowed(item), + FilterOperator::Filter(filter) => filter.is_allowed(item), + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct LogicalAnd { + and: Vec, +} + +impl Filterable for LogicalAnd { + fn is_valid(&self) -> Result<(), Error> { + for filter in &self.and { + filter.is_valid()?; + } + Ok(()) + } + + fn is_allowed(&self, item: &Transaction) -> bool { + self.and.iter().all(|filter| filter.is_allowed(item)) + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct LogicalOr { + or: Vec, +} + +impl Filterable for LogicalOr { + fn is_valid(&self) -> Result<(), Error> { + for filter in &self.or { + filter.is_valid()?; + } + Ok(()) + } + + fn is_allowed(&self, item: &Transaction) -> bool { + self.or.iter().any(|filter| filter.is_allowed(item)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + filters::{ + user_transaction_request::EntryFunctionFilter, + write_set_change_filter::{ + ChangeItemFilter, ModuleChangeFilter, ResourceChangeFilter, TableChangeFilter, + }, + MoveStructTagFilter, PositionalFilter, UserTransactionPayloadFilter, + }, + json_search::{JsonOrStringSearch, JsonSearchTerm}, + }; + use aptos_protos::indexer::v1::TransactionsInStorage; + use prost::Message; + use std::io::Read; + + // Decompress fixtures first, Ex: + + fn decompress_fixture(bytes: &[u8]) -> TransactionsInStorage { + let mut decompressor = lz4::Decoder::new(bytes).expect("Lz4 decompression failed."); + let mut decompressed = Vec::new(); + decompressor + .read_to_end(&mut decompressed) + .expect("Lz4 decompression failed."); + TransactionsInStorage::decode(decompressed.as_slice()).expect("Failed to parse transaction") + } + + #[allow(dead_code)] + fn load_taptos_fixture() -> TransactionsInStorage { + let data = include_bytes!( + "../fixtures/compressed_files_lz4_00008bc1d5adcf862d3967c1410001fb_705101000.pb.lz4" + ); + decompress_fixture(data) + } + + #[allow(dead_code)] + fn load_random_april_3mb_fixture() -> TransactionsInStorage { + let data = include_bytes!( + "../fixtures/compressed_files_lz4_0013c194ec4fdbfb8db7306170aac083_445907000.pb.lz4" + ); + decompress_fixture(data) + } + + #[allow(dead_code)] + fn load_graffio_fixture() -> TransactionsInStorage { + let data = include_bytes!( + "../fixtures/compressed_files_lz4_f3d880d9700c70d71fefe71aa9218aa9_301616000.pb.lz4" + ); + decompress_fixture(data) + } + + #[test] + pub fn test_query_parsing() { + let trf = TransactionRootFilter { + success: Some(true), + txn_type: Some(aptos_protos::transaction::v1::transaction::TransactionType::User), + }; + + let utrf = UserTransactionRequestFilter { + sender: Some("0x0011".into()), + payload: Some(UserTransactionPayloadFilter { + function: Some(EntryFunctionFilter { + address: Some("0x001".into()), + module: Some("module".into()), + function: Some("F".into()), + }), + arguments: Some(vec![PositionalFilter { + index: 0, + value: "0x0011".into(), + }]), + }), + }; + + let ef = EventFilter { + data: Some(JsonSearchTerm::new("spins".into(), 5.into()).unwrap()), + struct_type: Some(MoveStructTagFilter { + address: Some("0x0077".into()), + module: Some("roulette".into()), + name: Some("spin".into()), + }), + }; + + let wscf_res = WriteSetChangeFilter { + change_type: Some(aptos_protos::transaction::v1::write_set_change::Type::WriteResource), + change: Some(ChangeItemFilter::ResourceChange(ResourceChangeFilter { + resource_type: Some(MoveStructTagFilter { + address: Some("0x001af32".into()), + module: Some("airport".into()), + name: Some("airplane".into()), + }), + address: Some("0x001af32".into()), + data: Some(JsonSearchTerm::new("takeoff".into(), true.into()).unwrap()), + })), + }; + let wscf_table = WriteSetChangeFilter { + change_type: Some( + aptos_protos::transaction::v1::write_set_change::Type::WriteTableItem, + ), + change: Some(ChangeItemFilter::TableChange(TableChangeFilter { + handle: Some("0x796857465434253644536475453432453".into()), + key: Some(JsonOrStringSearch::String("table_key".into())), + key_type_str: Some("0x423453466345::some_module::SomeStruct".into()), + })), + }; + let wscf_mod = WriteSetChangeFilter { + change_type: Some(aptos_protos::transaction::v1::write_set_change::Type::WriteModule), + change: Some(ChangeItemFilter::ModuleChange(ModuleChangeFilter { + address: Some("0x0000098".into()), + })), + }; + + let write_set_ors = FilterOperator::new_or(vec![ + FilterOperator::Filter(APIFilter::WriteSetChangeFilter(wscf_res)), + FilterOperator::Filter(APIFilter::WriteSetChangeFilter(wscf_table)), + FilterOperator::Filter(APIFilter::WriteSetChangeFilter(wscf_mod)), + ]); + + let event_filter_or_write_set = FilterOperator::new_or(vec![ + FilterOperator::Filter(APIFilter::EventFilter(ef)), + write_set_ors, + ]); + + let transaction_root_and_request_filter = FilterOperator::new_or(vec![ + FilterOperator::Filter(APIFilter::TransactionRootFilter(trf)), + FilterOperator::Filter(APIFilter::UserTransactionRequestFilter(utrf)), + ]); + + let query = FilterOperator::new_or(vec![ + transaction_root_and_request_filter, + event_filter_or_write_set, + ]); + + println!( + "JSON RESULT: \n {}", + serde_json::to_string_pretty(&query).unwrap() + ); + + let txns = load_graffio_fixture(); + + // Benchmark how long it takes to do this 100 times + let start = std::time::Instant::now(); + const LOOPS: i32 = 1000; + for _ in 0..LOOPS { + for txn in &txns.transactions { + query.is_allowed(txn); + } + } + let elapsed = start.elapsed(); + + let total_txn = LOOPS * txns.transactions.len() as i32; + println!( + "BENCH: Took {:?} for {} transactions ({:?} each)", + elapsed, + total_txn, + elapsed / total_txn as u32 + ); + + let ef_econia = EventFilter { + data: None, + struct_type: Some(MoveStructTagFilter { + address: Some("0x00ECONIA".into()), + module: None, + name: None, + }), + }; + let ef_aries = EventFilter { + data: None, + struct_type: Some(MoveStructTagFilter { + address: Some("0x00ARIES".into()), + module: None, + name: None, + }), + }; + let query = FilterOperator::new_or(vec![ + FilterOperator::Filter(APIFilter::EventFilter(ef_econia)), + FilterOperator::Filter(APIFilter::EventFilter(ef_aries)), + ]); + println!( + "JSON RESULT: \n {}", + serde_json::to_string_pretty(&query).unwrap() + ); + + //println!("Filter result for u32: {}", filter.is_allowed(&item_u32)); // true + //println!("Filter result for String: {}", filter.is_allowed(&item_s)); // false + } +} diff --git a/rust/transaction-filter/src/filters/event_filter.rs b/rust/transaction-filter/src/filters/event_filter.rs new file mode 100644 index 000000000..46dffafb2 --- /dev/null +++ b/rust/transaction-filter/src/filters/event_filter.rs @@ -0,0 +1,48 @@ +use crate::{filters::MoveStructTagFilter, json_search::JsonSearchTerm, traits::Filterable}; +use anyhow::Error; +use aptos_protos::transaction::v1::{move_type::Content, Event}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct EventFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, + // Only for events that have a struct as their generic + #[serde(skip_serializing_if = "Option::is_none")] + pub struct_type: Option, +} + +impl Filterable for EventFilter { + fn is_valid(&self) -> Result<(), Error> { + if self.data.is_none() && self.struct_type.is_none() { + return Err(Error::msg( + "At least one of data or struct_type must be set", + )); + }; + + self.data.is_valid()?; + self.struct_type.is_valid()?; + Ok(()) + } + + fn is_allowed(&self, item: &Event) -> bool { + if !self.data.is_allowed(&item.data) { + return false; + } + + if let Some(struct_type_filter) = &self.struct_type { + if let Some(Content::Struct(struct_tag)) = + &item.r#type.as_ref().and_then(|t| t.content.as_ref()) + { + if !struct_type_filter.is_allowed(struct_tag) { + return false; + } + } else { + return false; + } + } + + true + } +} diff --git a/rust/transaction-filter/src/filters/mod.rs b/rust/transaction-filter/src/filters/mod.rs index f12a94846..3aca4e6cd 100644 --- a/rust/transaction-filter/src/filters/mod.rs +++ b/rust/transaction-filter/src/filters/mod.rs @@ -1,5 +1,6 @@ pub mod event; pub mod move_module; +pub mod positional; pub mod transaction_root; pub mod user_transaction_request; pub mod write_set_change_filter; @@ -7,6 +8,7 @@ pub mod write_set_change_filter; // Re-export for easier use pub use event::EventFilter; pub use move_module::{MoveModuleFilter, MoveStructTagFilter}; +pub use positional::PositionalFilter; pub use transaction_root::TransactionRootFilter; pub use user_transaction_request::{UserTransactionPayloadFilter, UserTransactionRequestFilter}; pub use write_set_change_filter::WriteSetChangeFilter; diff --git a/rust/transaction-filter/src/filters/positional.rs b/rust/transaction-filter/src/filters/positional.rs new file mode 100644 index 000000000..a8f6e3de3 --- /dev/null +++ b/rust/transaction-filter/src/filters/positional.rs @@ -0,0 +1,47 @@ +use crate::traits::Filterable; +use anyhow::Error; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +/// Allows matching a given value within an array of values, by index +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct PositionalFilter +where + T: PartialEq + Debug, +{ + pub value: T, + pub index: usize, +} + +impl Filterable> for PositionalFilter +where + T: PartialEq + Debug, +{ + fn is_valid(&self) -> Result<(), Error> { + Ok(()) + } + + fn is_allowed(&self, items: &Vec) -> bool { + items.get(self.index).map_or(false, |v| v == &self.value) + } +} + +impl Filterable> for Vec> +where + T: PartialEq + Debug, +{ + fn is_valid(&self) -> Result<(), Error> { + if self.is_empty() { + return Err(Error::msg( + "PositionalFilter must have at least one element", + )); + } + Ok(()) + } + + fn is_allowed(&self, items: &Vec) -> bool { + self.iter() + .all(|arg| items.get(arg.index).map_or(false, |v| v == &arg.value)) + } +} diff --git a/rust/transaction-filter/src/filters/user_transaction_request.rs b/rust/transaction-filter/src/filters/user_transaction_request.rs index 5054a16d4..a32851495 100644 --- a/rust/transaction-filter/src/filters/user_transaction_request.rs +++ b/rust/transaction-filter/src/filters/user_transaction_request.rs @@ -1,4 +1,4 @@ -use crate::traits::Filterable; +use crate::{filters::PositionalFilter, traits::Filterable}; use anyhow::{anyhow, Error}; use aptos_protos::transaction::v1::{ multisig_transaction_payload, transaction_payload, EntryFunctionId, EntryFunctionPayload, @@ -102,21 +102,28 @@ impl Filterable for EntryFunctionFilter { pub struct UserTransactionPayloadFilter { #[serde(skip_serializing_if = "Option::is_none")] pub function: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub arguments: Option>>, + // TODO: handle type args? } impl Filterable for UserTransactionPayloadFilter { #[inline] fn validate_state(&self) -> Result<(), Error> { - if self.function.is_none() { - return Err(Error::msg("At least one of function must be set")); + if self.function.is_none() && self.arguments.is_none() { + return Err(Error::msg( + "At least one of function or arguments must be set", + )); }; self.function.is_valid()?; + self.arguments.is_valid()?; Ok(()) } #[inline] fn is_allowed(&self, payload: &EntryFunctionPayload) -> bool { self.function.is_allowed_opt(&payload.function) + && self.arguments.is_allowed(&payload.arguments) } } diff --git a/rust/transaction-filter/src/filters/write_set_change_filter.rs b/rust/transaction-filter/src/filters/write_set_change_filter.rs index 12db6f2fa..11dccb293 100644 --- a/rust/transaction-filter/src/filters/write_set_change_filter.rs +++ b/rust/transaction-filter/src/filters/write_set_change_filter.rs @@ -1,4 +1,8 @@ -use crate::{filters::MoveStructTagFilter, traits::Filterable}; +use crate::{ + filters::MoveStructTagFilter, + json_search::{JsonOrStringSearch, JsonSearchTerm}, + traits::Filterable, +}; use anyhow::Error; use aptos_protos::transaction::v1::{ write_set_change::Change, DeleteModule, DeleteResource, DeleteTableItem, WriteModule, @@ -22,7 +26,7 @@ impl Filterable for WriteSetChangeFilter { #[inline] fn validate_state(&self) -> Result<(), Error> { if self.change.is_none() { - return Err(Error::msg("At least one of change must be set")); + return Err(Error::msg("field change must be set")); }; self.change.is_valid()?; Ok(()) @@ -155,6 +159,9 @@ pub struct ResourceChangeFilter { pub resource_type: Option, #[serde(skip_serializing_if = "Option::is_none")] pub address: Option, + // This is only applicable to WriteResource, but I'm lazy + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, } pub enum ResourceChange<'a> { @@ -167,10 +174,11 @@ impl Filterable> for ResourceChangeFilter { fn validate_state(&self) -> Result<(), Error> { if self.resource_type.is_none() && self.address.is_none() { return Err(Error::msg( - "At least one of resource_type or address must be set", + "At least one of resource_type, address, or data must be set", )); }; self.resource_type.is_valid()?; + self.data.is_valid()?; Ok(()) } @@ -188,6 +196,9 @@ impl Filterable> for ResourceChangeFilter { return false; } } + if self.data.is_some() { + return false; + } }, ResourceChange::WriteResource(wr) => { if let Some(address) = &self.address { @@ -200,6 +211,11 @@ impl Filterable> for ResourceChangeFilter { return false; } } + if let Some(data) = &self.data { + if !data.find(&wr.data) { + return false; + } + } }, } true @@ -242,7 +258,7 @@ impl Filterable> for ModuleChangeFilter { #[serde(deny_unknown_fields)] pub struct TableChangeFilter { pub handle: Option, - pub key: Option, + pub key: Option, pub key_type_str: Option, } @@ -255,7 +271,7 @@ impl Filterable> for TableChangeFilter { fn validate_state(&self) -> Result<(), Error> { if self.handle.is_none() && self.key.is_none() && self.key_type_str.is_none() { return Err(Error::msg( - "At least one of handle, key, or key_type must be set", + "At least one of handle, key, or key_type_str must be set", )); }; Ok(()) @@ -278,7 +294,11 @@ impl Filterable> for TableChangeFilter { } } if let Some(key) = &self.key { - if !dti.data.as_ref().map_or(false, |dtd| key == &dtd.key) { + if !dti + .data + .as_ref() + .map_or(false, |dtd| key.is_allowed(&dtd.key)) + { return false; } } diff --git a/rust/transaction-filter/src/json_search.rs b/rust/transaction-filter/src/json_search.rs new file mode 100644 index 000000000..b2e56eea9 --- /dev/null +++ b/rust/transaction-filter/src/json_search.rs @@ -0,0 +1,290 @@ +use crate::traits::Filterable; +use anyhow::Error; +use memchr::memmem::Finder; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +/** +Find multiple needles in a haystack. JSON parsing is relatively expensive, so we + instead treat the JSON as a string and do simple matching. + This means false positives are possible. + +Currently, we use the `memchr` crate, which is a SIMD-accelerated library for finding bytes in bytes. + This is faster than the naive `str::find`, however it does repeat work for each needle. + There are alternatives, such as Aho-Corasick algorithm (https://github.com/BurntSushi/aho-corasick), + which uses a trie-based for finding multiple needles in a haystack in a single pass. + This is a good candidate for future work. + +Given we serialize some number types as integers, and some others as strings, we have to search for both options. +This approach means: + 1. It's impossible to specify _where_ in the JSON the key/value pair is. I.e a search for a key of "address" + and value of "0x5" will match both `{"address": "0x5"}`, and `{"inner": {"address": "0x5"}}`. + 2. The above means false positives are clearly possible. Depending on ecosystem feedback and overall performance, + we may change this, and offer full json support. There are SIMD accelerated JSON parsers available, such as + https://github.com/cloudwego/sonic-rs or https://github.com/simd-lite/simd-json . Benchmarks will be needed to + determine if this is worth it. +*/ + +pub const MIN_KEY_TERM_LENGTH: usize = 3; + +#[derive(Error, Debug)] +pub enum JSONSearchError { + #[error("The json type `{0}` is not supported in searches")] + UnsupportedJsonType(&'static str), + #[error( + "The key name is too short, must be at least {} characters", + MIN_KEY_TERM_LENGTH + )] + KeyNameTooShort, + #[error("Invalid JSON key term")] + InvalidKeyTerm, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct SerializedJsonSearchTerm { + pub key: String, + pub value: serde_json::Value, +} + +impl TryInto for SerializedJsonSearchTerm { + type Error = JSONSearchError; + + fn try_into(self) -> Result { + JsonSearchTerm::new(self.key, self.value) + } +} + +impl From for SerializedJsonSearchTerm { + fn from(term: JsonSearchTerm) -> Self { + term.original_term + } +} + +// Custom serde serialization/deserialization that uses above tryinto and from implementations +impl Serialize for JsonSearchTerm { + fn serialize(&self, serializer: S) -> Result + where + S: serde::ser::Serializer, + { + SerializedJsonSearchTerm::from(self.clone()).serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for JsonSearchTerm { + fn deserialize(deserializer: D) -> Result + where + D: serde::de::Deserializer<'de>, + { + let term = SerializedJsonSearchTerm::deserialize(deserializer)?; + term.try_into().map_err(serde::de::Error::custom) + } +} + +#[derive(Clone)] +pub struct JsonSearchTerm { + original_term: SerializedJsonSearchTerm, + // We use an owned finder; while this clones the string once, this is only done once at instantiation + // Because we serialize some numbers as strings, and some as integers, we need to search for both + key_finders: Vec>, +} + +impl PartialEq for JsonSearchTerm { + fn eq(&self, other: &Self) -> bool { + self.original_term == other.original_term + } +} + +impl std::fmt::Debug for JsonSearchTerm { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let keys = &self + .key_finders + .iter() + .map(|f| String::from_utf8_lossy(f.needle())); + f.debug_struct("JsonSearchTerm") + .field("key_finders", keys) + .field("original_term", &self.original_term) + .finish() + } +} + +impl JsonSearchTerm { + pub fn new(key: String, value: serde_json::Value) -> Result { + if key.len() < MIN_KEY_TERM_LENGTH { + Err(JSONSearchError::KeyNameTooShort)?; + } + let values = match value.clone() { + // We use to_string here to we get the quoted JSON value + serde_json::Value::String(s) => vec![double_encode_json_string(s)], + // For numbers, we need to search for both the string and the number. + // TODO: There is probably a better way to do this + serde_json::Value::Number(n) => vec![n.to_string(), format!("\\\"{}\\\"", n)], + serde_json::Value::Bool(b) => vec![b.to_string()], + serde_json::Value::Null => vec!["null".to_string()], + // Maybe we'll support these in the future, but it's more complicated, so for now we don't + // TODO: reconsider supporting arrays and/or other types + serde_json::Value::Array(_) => Err(JSONSearchError::UnsupportedJsonType("Array"))?, + serde_json::Value::Object(_) => Err(JSONSearchError::UnsupportedJsonType("Object"))?, + }; + + // We need to account for the fact that the key is quoted in the JSON: so we double quote. + let encoded_key = double_encode_json_string(key.clone()); + // And we append the `:` to the key, and the value + let key_finders = values + .iter() + .map(|v| Finder::new(&format!("{}:{}", encoded_key, v)).into_owned()) + .collect(); + Ok(Self { + key_finders, + original_term: SerializedJsonSearchTerm { key, value }, + }) + } + + pub fn from_json(json: serde_json::Value) -> Result { + let key = json["key"] + .as_str() + .ok_or(JSONSearchError::InvalidKeyTerm)?; + let value = json["value"].clone(); + Self::new(key.to_string(), value) + } + + pub fn find(&self, haystack: &str) -> bool { + let haystack_bytes = haystack.as_bytes(); + self.key_finders + .iter() + .any(|finder| finder.find(haystack_bytes).is_some()) + } +} + +impl Filterable for JsonSearchTerm { + fn is_valid(&self) -> Result<(), Error> { + // Validation is performed elsewhere + Ok(()) + } + + fn is_allowed(&self, item: &String) -> bool { + self.find(item) + } +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +#[serde(untagged)] +pub enum JsonOrStringSearch { + Json(JsonSearchTerm), + String(String), +} + +impl Filterable for JsonOrStringSearch { + fn is_valid(&self) -> Result<(), Error> { + match self { + JsonOrStringSearch::Json(json) => json.is_valid(), + JsonOrStringSearch::String(_) => Ok(()), + } + } + + fn is_allowed(&self, item: &String) -> bool { + match self { + JsonOrStringSearch::Json(json) => json.is_allowed(item), + JsonOrStringSearch::String(s) => item == s, + } + } +} + +pub fn double_encode_json_string(s: String) -> String { + let s = serde_json::Value::String(s.to_string()).to_string(); + let s = serde_json::Value::String(s).to_string(); + // Then we remove the leading and trailing quotes + s[1..s.len() - 1].to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use std::borrow::Cow; + + #[test] + fn test_double_encode_json_string() { + let s = "hello"; + let expected = "\\\"hello\\\""; + assert_eq!(expected, double_encode_json_string(s.to_string())); + } + + fn print_search_debug(needle: &JsonSearchTerm) { + let needles_str = needle + .key_finders + .iter() + .map(|f| String::from_utf8_lossy(f.needle())) + .collect::>>() + .join(", "); + println!("Needles: {}", needles_str); + } + + fn search_test_case(test_json: &serde_json::Value, key: &str, value: serde_json::Value) { + let test_json = serde_json::Value::String(test_json.to_string()).to_string(); + + println!( + "\n==== Searching for `{}: {}` in `{}` ====", + key, value, test_json + ); + + let needle = JsonSearchTerm::new(key.into(), value).unwrap(); + print_search_debug(&needle); + println!("Haystack: {}", test_json); + assert!(needle.find(&test_json), "Failed to find needle in haystack"); + } + + #[test] + fn test_json_search() { + let test_json = json!( + { + // String and nested string + "address": "0x3", + "inner": { + "b": "c", + "address": "0x5" + }, + + // Null + "nullval": null, + + // Numbers + "somenum": 5, + "bignum": "101", + + // Bools + "trueval": true, + "falseval": false, + } + ); + + // String and nested string + search_test_case( + &test_json, + "address", + serde_json::Value::String("0x3".into()), + ); + + search_test_case( + &test_json, + "address", + serde_json::Value::String("0x5".into()), + ); + + // Null + search_test_case(&test_json, "nullval", serde_json::Value::Null); + + // Numbers + search_test_case(&test_json, "somenum", serde_json::Value::Number(5.into())); + search_test_case( + &test_json, + "bignum", + serde_json::Value::String("101".into()), + ); + + // Bools + search_test_case(&test_json, "trueval", serde_json::Value::Bool(true)); + search_test_case(&test_json, "falseval", serde_json::Value::Bool(false)); + } +} diff --git a/rust/transaction-filter/src/lib.rs b/rust/transaction-filter/src/lib.rs index 9bcb9713c..387f8b9f6 100644 --- a/rust/transaction-filter/src/lib.rs +++ b/rust/transaction-filter/src/lib.rs @@ -1,12 +1,16 @@ -mod api_filter; +pub mod filter_operator; +pub mod api_filter; pub mod filters; +pub mod json_search; pub mod traits; /** The goal of transaction filtering is to be able to save resources downstream of wherever filtering is used. For this to be true, the filtering itself must be fast, and so we do a few things: - 1. We avoid clones, copies, etc as much as possible - 2. We do a single pass over the transaction data + 1. JSON fields are not parsed. Instead, we treat it as a string, and do simple matching. *This means + false positives are possible*. This may change in the future, but JSON parsing is not cheap. + 2. We avoid clones, copies, etc as much as possible + 3. We do a single pass over the transaction data There are four different parts of a transaction that are queryable: 1. The "root" level. This includes: @@ -17,12 +21,15 @@ There are four different parts of a transaction that are queryable: - Payload: we only support the entry function payload - Entry function (address, module, name) - Entry function ID string + - Arbitrary JSON data 3. Events. Each event has: - Key - Type + - Arbitrary JSON data 4. WriteSet Changes. Each change may have: - Type - Address + - Arbitrary JSON data **/ #[cfg(test)] From a74334bc21f1af6ee83b3d27fc7f71ff01996bb6 Mon Sep 17 00:00:00 2001 From: CapCap Date: Fri, 31 May 2024 13:30:52 -0700 Subject: [PATCH 2/7] add documentation and memchr/serde bench --- rust/transaction-filter/src/filters/event.rs | 15 ++- .../src/filters/event_filter.rs | 48 ------- rust/transaction-filter/src/json_search.rs | 119 +++++++++++++++++- 3 files changed, 130 insertions(+), 52 deletions(-) delete mode 100644 rust/transaction-filter/src/filters/event_filter.rs diff --git a/rust/transaction-filter/src/filters/event.rs b/rust/transaction-filter/src/filters/event.rs index 1012f6b6b..6b7fc14a7 100644 --- a/rust/transaction-filter/src/filters/event.rs +++ b/rust/transaction-filter/src/filters/event.rs @@ -1,4 +1,4 @@ -use crate::{filters::MoveStructTagFilter, traits::Filterable}; +use crate::{filters::MoveStructTagFilter, json_search::JsonSearchTerm, traits::Filterable}; use anyhow::Error; use aptos_protos::transaction::v1::{move_type::Content, Event}; use serde::{Deserialize, Serialize}; @@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, PartialEq)] #[serde(deny_unknown_fields)] pub struct EventFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, // Only for events that have a struct as their generic #[serde(skip_serializing_if = "Option::is_none")] pub struct_type: Option, @@ -14,10 +16,13 @@ pub struct EventFilter { impl Filterable for EventFilter { #[inline] fn validate_state(&self) -> Result<(), Error> { - if self.struct_type.is_none() { - return Err(Error::msg("At least one of struct_type must be set")); + if self.data.is_none() && self.struct_type.is_none() { + return Err(Error::msg( + "At least one of data or struct_type must be set", + )); }; + self.data.is_valid()?; self.struct_type.is_valid()?; Ok(()) } @@ -36,6 +41,10 @@ impl Filterable for EventFilter { } } + if !self.data.is_allowed(&item.data) { + return false; + } + true } } diff --git a/rust/transaction-filter/src/filters/event_filter.rs b/rust/transaction-filter/src/filters/event_filter.rs deleted file mode 100644 index 46dffafb2..000000000 --- a/rust/transaction-filter/src/filters/event_filter.rs +++ /dev/null @@ -1,48 +0,0 @@ -use crate::{filters::MoveStructTagFilter, json_search::JsonSearchTerm, traits::Filterable}; -use anyhow::Error; -use aptos_protos::transaction::v1::{move_type::Content, Event}; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Serialize, Deserialize, PartialEq)] -#[serde(deny_unknown_fields)] -pub struct EventFilter { - #[serde(skip_serializing_if = "Option::is_none")] - pub data: Option, - // Only for events that have a struct as their generic - #[serde(skip_serializing_if = "Option::is_none")] - pub struct_type: Option, -} - -impl Filterable for EventFilter { - fn is_valid(&self) -> Result<(), Error> { - if self.data.is_none() && self.struct_type.is_none() { - return Err(Error::msg( - "At least one of data or struct_type must be set", - )); - }; - - self.data.is_valid()?; - self.struct_type.is_valid()?; - Ok(()) - } - - fn is_allowed(&self, item: &Event) -> bool { - if !self.data.is_allowed(&item.data) { - return false; - } - - if let Some(struct_type_filter) = &self.struct_type { - if let Some(Content::Struct(struct_tag)) = - &item.r#type.as_ref().and_then(|t| t.content.as_ref()) - { - if !struct_type_filter.is_allowed(struct_tag) { - return false; - } - } else { - return false; - } - } - - true - } -} diff --git a/rust/transaction-filter/src/json_search.rs b/rust/transaction-filter/src/json_search.rs index b2e56eea9..f32e7f016 100644 --- a/rust/transaction-filter/src/json_search.rs +++ b/rust/transaction-filter/src/json_search.rs @@ -202,7 +202,7 @@ pub fn double_encode_json_string(s: String) -> String { mod tests { use super::*; use serde_json::json; - use std::borrow::Cow; + use std::{borrow::Cow, str::FromStr}; #[test] fn test_double_encode_json_string() { @@ -287,4 +287,121 @@ mod tests { search_test_case(&test_json, "trueval", serde_json::Value::Bool(true)); search_test_case(&test_json, "falseval", serde_json::Value::Bool(false)); } + + /** + For searching `inner.address: 0x5` in a singly nested json, the results are: + ```text + BENCH: Memchr took 19.75µs for 1000 iters (19ns each) + BENCH: Serde Search took 1.82875ms for 1000 iters (1.828µs each) + Memchr is 96x faster than Serde + ``` + + If we double that json- i.e add an inner2 with the contents of test_json, we get: + ```text + BENCH: Memchr took 54.334µs for 1000 iters (54ns each) + BENCH: Serde Search took 14.213292ms for 1000 iters (14.213µs each) + Memchr is 263x faster than Serde + ``` + + This is excluding memory allocation, for which serde Value is [not great](https://github.com/serde-rs/json/issues/635) + + if we look at something like graffio txns, we’d be looking at more than three orders of magnitude in difference + The main problem/optimization is that memchr does a tiny bit of work on startup (few ns), but is then re-used forever; + as long as the stream remains, the per json search is relatively constant, because it’s just so fast, and our jsons are relatively small + + Serde however is not: it scales pretty linearly (and then some), and so the larger the json, the bigger the delta + + Whether we care about an extra 25-50ms per batch is a different story and this is, of course, with serde; + it’s possible using one of the more efficient json parsers I looked into we could shave, maybe, [20-30% off that time](https://github.com/serde-rs/json-benchmark) + **/ + #[test] + fn test_bench_json_search_vs_serde() { + let mut test_json = json!( + { + // String and nested string + "address": "0x3", + "inner": { + "b": "c", + "address": "0x5" + }, + + // Null + "nullval": null, + + // Numbers + "somenum": 5, + "bignum": "101", + + // Bools + "trueval": true, + "falseval": false, + } + ); + + let test_json_clone = test_json.clone(); + test_json + .get_mut("inner") + .unwrap() + .as_object_mut() + .unwrap() + .insert("inner2".to_string(), test_json_clone); + + let test_json_encoded = serde_json::Value::String(test_json.to_string()).to_string(); + + let needle = + JsonSearchTerm::new("address".into(), serde_json::Value::String("0x5".into())).unwrap(); + + let start = std::time::Instant::now(); + + const ITERATIONS: usize = 1000; + for _ in 0..ITERATIONS { + needle.find(&test_json_encoded); + } + + let elapsed = start.elapsed(); + let memchr_average = elapsed / ITERATIONS as u32; + println!( + "BENCH: Memchr took {:?} for {} iters ({:?} each)", + elapsed, ITERATIONS, memchr_average + ); + + let json_search_term = ["inner".to_string(), "address".to_string()]; + let json_search_value = serde_json::Value::String("0x5".into()); + for _ in 0..ITERATIONS { + let test_json_serval = serde_json::Value::from_str(&test_json_encoded).unwrap(); + let test_json_serval = + serde_json::Value::from_str(test_json_serval.as_str().unwrap()).unwrap(); + + if !test_json_serval.is_object() { + panic!("Expected object"); + } + let mut current = &test_json_serval; + for key in json_search_term.iter() { + if let Some(next) = current.get(key) { + current = next; + } else { + break; + } + } + + // Ensure we found the value + if current != &json_search_value { + panic!( + "Failed to find needle in haystack: \n{:} \n{:} \n<<<", + current, json_search_value + ); + } + } + let elapsed = start.elapsed(); + let serde_average = elapsed / ITERATIONS as u32; + println!( + "BENCH: Serde Search took {:?} for {} iters ({:?} each)", + elapsed, ITERATIONS, serde_average + ); + + println!( + "Memchr is {:?}x faster than Serde", + serde_average.as_nanos() / memchr_average.as_nanos() + ); + } } From 68374d2d39daf58364ebf8e58919cf9a98632f28 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 10 Jun 2024 19:11:22 -0700 Subject: [PATCH 3/7] rebase complete --- rust/transaction-filter/Cargo.toml | 2 +- rust/transaction-filter/src/api_filter.rs | 84 ++++++------------- .../transaction-filter/src/filter_operator.rs | 77 +++++++---------- .../src/filters/positional.rs | 4 +- rust/transaction-filter/src/json_search.rs | 4 +- rust/transaction-filter/src/lib.rs | 5 +- rust/transaction-filter/src/test_lib/mod.rs | 36 ++++++++ 7 files changed, 101 insertions(+), 111 deletions(-) create mode 100644 rust/transaction-filter/src/test_lib/mod.rs diff --git a/rust/transaction-filter/Cargo.toml b/rust/transaction-filter/Cargo.toml index 2d68aa40e..33d72e3e8 100644 --- a/rust/transaction-filter/Cargo.toml +++ b/rust/transaction-filter/Cargo.toml @@ -27,6 +27,6 @@ serde_json = { workspace = true } thiserror = { workspace = true } [dev-dependencies] -# we only decompress the fixture protos in tests +# we only decompress the fixture protos in test lz4 = "1.24.0" diff --git a/rust/transaction-filter/src/api_filter.rs b/rust/transaction-filter/src/api_filter.rs index d29ebbc87..5cb94caac 100644 --- a/rust/transaction-filter/src/api_filter.rs +++ b/rust/transaction-filter/src/api_filter.rs @@ -110,57 +110,24 @@ impl Filterable for PublicOrApiFilter { #[cfg(test)] mod test { use super::*; - use crate::filters::{ - user_transaction_request::EntryFunctionFilter, - write_set_change_filter::{ - ChangeItemFilter, ModuleChangeFilter, ResourceChangeFilter, TableChangeFilter, + use crate::{ + filters::{ + user_transaction_request::EntryFunctionFilter, + write_set_change_filter::{ + ChangeItemFilter, ModuleChangeFilter, ResourceChangeFilter, TableChangeFilter, + }, + MoveStructTagFilter, PositionalFilter, UserTransactionPayloadFilter, }, - MoveStructTagFilter, UserTransactionPayloadFilter, + json_search::{JsonOrStringSearch, JsonSearchTerm}, + test_lib::{load_graffio_fixture, load_random_april_3mb_fixture, load_taptos_fixture}, }; use aptos_protos::indexer::v1::TransactionsInStorage; - use prost::Message; - use std::io::Read; - - // Decompress fixtures first, Ex: - - fn decompress_fixture(bytes: &[u8]) -> TransactionsInStorage { - let mut decompressor = lz4::Decoder::new(bytes).expect("Lz4 decompression failed."); - let mut decompressed = Vec::new(); - decompressor - .read_to_end(&mut decompressed) - .expect("Lz4 decompression failed."); - TransactionsInStorage::decode(decompressed.as_slice()).expect("Failed to parse transaction") - } - - #[allow(dead_code)] - fn load_taptos_fixture() -> TransactionsInStorage { - let data = include_bytes!( - "../fixtures/compressed_files_lz4_00008bc1d5adcf862d3967c1410001fb_705101000.pb.lz4" - ); - decompress_fixture(data) - } - - #[allow(dead_code)] - fn load_random_april_3mb_fixture() -> TransactionsInStorage { - let data = include_bytes!( - "../fixtures/compressed_files_lz4_0013c194ec4fdbfb8db7306170aac083_445907000.pb.lz4" - ); - decompress_fixture(data) - } - - #[allow(dead_code)] - fn load_graffio_fixture() -> TransactionsInStorage { - let data = include_bytes!( - "../fixtures/compressed_files_lz4_f3d880d9700c70d71fefe71aa9218aa9_301616000.pb.lz4" - ); - decompress_fixture(data) - } #[test] pub fn test_query_parsing() { let trf = TransactionRootFilter { - success: Some(false), - txn_type: None, + success: Some(true), + txn_type: Some(aptos_protos::transaction::v1::transaction::TransactionType::User), }; let utrf = UserTransactionRequestFilter { @@ -171,28 +138,28 @@ mod test { module: Some("module".into()), function: Some("F".into()), }), + arguments: Some(vec![PositionalFilter { + index: 0, + value: "0x0011".into(), + }]), }), }; - let ef = EventFilter { + let ef1 = EventFilter { + data: Some(JsonSearchTerm::new("spins".into(), 5.into()).unwrap()), struct_type: Some(MoveStructTagFilter { address: Some("0x0077".into()), module: Some("roulette".into()), name: Some("spin".into()), }), }; - let ef_econia = EventFilter { - struct_type: Some(MoveStructTagFilter { - address: Some("0x00ECONIA".into()), - module: None, - name: None, - }), - }; - let ef_aries = EventFilter { + + let ef2 = EventFilter { + data: Some(JsonSearchTerm::new("debt".into(), 12.into()).unwrap()), struct_type: Some(MoveStructTagFilter { - address: Some("0x00ARIES".into()), - module: None, - name: None, + address: Some("0x0052".into()), + module: Some("lending".into()), + name: Some("borrow_to_gamble".into()), }), }; @@ -204,12 +171,13 @@ mod test { name: Some("airplane".into()), }), address: Some("0x001af32".into()), + data: Some(JsonSearchTerm::new("takeoff".into(), true.into()).unwrap()), })), }; let wscf_table = WriteSetChangeFilter { change: Some(ChangeItemFilter::TableChange(TableChangeFilter { handle: Some("0x796857465434253644536475453432453".into()), - key: Some("table_key".into()), + key: Some(JsonOrStringSearch::String("table_key".into())), key_type_str: Some("0x423453466345::some_module::SomeStruct".into()), })), }; @@ -222,7 +190,7 @@ mod test { let query = PublicOrApiFilter { root_filter: Some(trf), user_transaction_filter: Some(utrf), - event_filter: Some(vec![ef, ef_econia, ef_aries]), + event_filter: Some(vec![ef1, ef2]), write_set_change_filter: Some(vec![wscf_res, wscf_table, wscf_mod]), }; diff --git a/rust/transaction-filter/src/filter_operator.rs b/rust/transaction-filter/src/filter_operator.rs index 265820be1..4d5d875a8 100644 --- a/rust/transaction-filter/src/filter_operator.rs +++ b/rust/transaction-filter/src/filter_operator.rs @@ -22,7 +22,7 @@ pub enum APIFilter { } impl Filterable for APIFilter { - fn is_valid(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), Error> { match self { APIFilter::TransactionRootFilter(filter) => filter.is_valid(), APIFilter::UserTransactionRequestFilter(filter) => filter.is_valid(), @@ -74,6 +74,7 @@ impl Filterable for APIFilter { pub enum FilterOperator { And(LogicalAnd), Or(LogicalOr), + Not(LogicalNot), Filter(APIFilter), } @@ -86,16 +87,21 @@ impl FilterOperator { FilterOperator::Or(LogicalOr { or }) } + pub fn new_not(not: Vec) -> Self { + FilterOperator::Not(LogicalNot { not }) + } + pub fn new_filter(filter: APIFilter) -> Self { FilterOperator::Filter(filter) } } impl Filterable for FilterOperator { - fn is_valid(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), Error> { match self { FilterOperator::And(and) => and.is_valid(), FilterOperator::Or(or) => or.is_valid(), + FilterOperator::Not(not) => not.is_valid(), FilterOperator::Filter(filter) => filter.is_valid(), } } @@ -104,6 +110,7 @@ impl Filterable for FilterOperator { match self { FilterOperator::And(and) => and.is_allowed(item), FilterOperator::Or(or) => or.is_allowed(item), + FilterOperator::Not(not) => not.is_allowed(item), FilterOperator::Filter(filter) => filter.is_allowed(item), } } @@ -115,7 +122,7 @@ pub struct LogicalAnd { } impl Filterable for LogicalAnd { - fn is_valid(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), Error> { for filter in &self.and { filter.is_valid()?; } @@ -133,7 +140,7 @@ pub struct LogicalOr { } impl Filterable for LogicalOr { - fn is_valid(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), Error> { for filter in &self.or { filter.is_valid()?; } @@ -145,6 +152,24 @@ impl Filterable for LogicalOr { } } +#[derive(Debug, Deserialize, Serialize)] +pub struct LogicalNot { + not: Vec, +} + +impl Filterable for LogicalNot { + fn validate_state(&self) -> Result<(), Error> { + for filter in &self.not { + filter.is_valid()?; + } + Ok(()) + } + + fn is_allowed(&self, item: &Transaction) -> bool { + !self.not.iter().any(|filter| filter.is_allowed(item)) + } +} + #[cfg(test)] mod test { use super::*; @@ -157,45 +182,8 @@ mod test { MoveStructTagFilter, PositionalFilter, UserTransactionPayloadFilter, }, json_search::{JsonOrStringSearch, JsonSearchTerm}, + test_lib::load_graffio_fixture, }; - use aptos_protos::indexer::v1::TransactionsInStorage; - use prost::Message; - use std::io::Read; - - // Decompress fixtures first, Ex: - - fn decompress_fixture(bytes: &[u8]) -> TransactionsInStorage { - let mut decompressor = lz4::Decoder::new(bytes).expect("Lz4 decompression failed."); - let mut decompressed = Vec::new(); - decompressor - .read_to_end(&mut decompressed) - .expect("Lz4 decompression failed."); - TransactionsInStorage::decode(decompressed.as_slice()).expect("Failed to parse transaction") - } - - #[allow(dead_code)] - fn load_taptos_fixture() -> TransactionsInStorage { - let data = include_bytes!( - "../fixtures/compressed_files_lz4_00008bc1d5adcf862d3967c1410001fb_705101000.pb.lz4" - ); - decompress_fixture(data) - } - - #[allow(dead_code)] - fn load_random_april_3mb_fixture() -> TransactionsInStorage { - let data = include_bytes!( - "../fixtures/compressed_files_lz4_0013c194ec4fdbfb8db7306170aac083_445907000.pb.lz4" - ); - decompress_fixture(data) - } - - #[allow(dead_code)] - fn load_graffio_fixture() -> TransactionsInStorage { - let data = include_bytes!( - "../fixtures/compressed_files_lz4_f3d880d9700c70d71fefe71aa9218aa9_301616000.pb.lz4" - ); - decompress_fixture(data) - } #[test] pub fn test_query_parsing() { @@ -229,7 +217,6 @@ mod test { }; let wscf_res = WriteSetChangeFilter { - change_type: Some(aptos_protos::transaction::v1::write_set_change::Type::WriteResource), change: Some(ChangeItemFilter::ResourceChange(ResourceChangeFilter { resource_type: Some(MoveStructTagFilter { address: Some("0x001af32".into()), @@ -241,9 +228,6 @@ mod test { })), }; let wscf_table = WriteSetChangeFilter { - change_type: Some( - aptos_protos::transaction::v1::write_set_change::Type::WriteTableItem, - ), change: Some(ChangeItemFilter::TableChange(TableChangeFilter { handle: Some("0x796857465434253644536475453432453".into()), key: Some(JsonOrStringSearch::String("table_key".into())), @@ -251,7 +235,6 @@ mod test { })), }; let wscf_mod = WriteSetChangeFilter { - change_type: Some(aptos_protos::transaction::v1::write_set_change::Type::WriteModule), change: Some(ChangeItemFilter::ModuleChange(ModuleChangeFilter { address: Some("0x0000098".into()), })), diff --git a/rust/transaction-filter/src/filters/positional.rs b/rust/transaction-filter/src/filters/positional.rs index a8f6e3de3..296085131 100644 --- a/rust/transaction-filter/src/filters/positional.rs +++ b/rust/transaction-filter/src/filters/positional.rs @@ -18,7 +18,7 @@ impl Filterable> for PositionalFilter where T: PartialEq + Debug, { - fn is_valid(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), Error> { Ok(()) } @@ -31,7 +31,7 @@ impl Filterable> for Vec> where T: PartialEq + Debug, { - fn is_valid(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), Error> { if self.is_empty() { return Err(Error::msg( "PositionalFilter must have at least one element", diff --git a/rust/transaction-filter/src/json_search.rs b/rust/transaction-filter/src/json_search.rs index f32e7f016..54e538698 100644 --- a/rust/transaction-filter/src/json_search.rs +++ b/rust/transaction-filter/src/json_search.rs @@ -157,7 +157,7 @@ impl JsonSearchTerm { } impl Filterable for JsonSearchTerm { - fn is_valid(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), Error> { // Validation is performed elsewhere Ok(()) } @@ -176,7 +176,7 @@ pub enum JsonOrStringSearch { } impl Filterable for JsonOrStringSearch { - fn is_valid(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), Error> { match self { JsonOrStringSearch::Json(json) => json.is_valid(), JsonOrStringSearch::String(_) => Ok(()), diff --git a/rust/transaction-filter/src/lib.rs b/rust/transaction-filter/src/lib.rs index 387f8b9f6..1f7766fe7 100644 --- a/rust/transaction-filter/src/lib.rs +++ b/rust/transaction-filter/src/lib.rs @@ -1,9 +1,12 @@ -pub mod filter_operator; pub mod api_filter; +pub mod filter_operator; pub mod filters; pub mod json_search; pub mod traits; +#[cfg(test)] +pub mod test_lib; + /** The goal of transaction filtering is to be able to save resources downstream of wherever filtering is used. For this to be true, the filtering itself must be fast, and so we do a few things: diff --git a/rust/transaction-filter/src/test_lib/mod.rs b/rust/transaction-filter/src/test_lib/mod.rs new file mode 100644 index 000000000..d7672730e --- /dev/null +++ b/rust/transaction-filter/src/test_lib/mod.rs @@ -0,0 +1,36 @@ +use aptos_protos::indexer::v1::TransactionsInStorage; +use prost::Message; +use std::io::Read; + +pub fn decompress_fixture(bytes: &[u8]) -> TransactionsInStorage { + let mut decompressor = lz4::Decoder::new(bytes).expect("Lz4 decompression failed."); + let mut decompressed = Vec::new(); + decompressor + .read_to_end(&mut decompressed) + .expect("Lz4 decompression failed."); + TransactionsInStorage::decode(decompressed.as_slice()).expect("Failed to parse transaction") +} + +#[allow(dead_code)] +pub fn load_taptos_fixture() -> TransactionsInStorage { + let data = include_bytes!( + "../../fixtures/compressed_files_lz4_00008bc1d5adcf862d3967c1410001fb_705101000.pb.lz4" + ); + decompress_fixture(data) +} + +#[allow(dead_code)] +pub fn load_random_april_3mb_fixture() -> TransactionsInStorage { + let data = include_bytes!( + "../../fixtures/compressed_files_lz4_0013c194ec4fdbfb8db7306170aac083_445907000.pb.lz4" + ); + decompress_fixture(data) +} + +#[allow(dead_code)] +pub fn load_graffio_fixture() -> TransactionsInStorage { + let data = include_bytes!( + "../../fixtures/compressed_files_lz4_f3d880d9700c70d71fefe71aa9218aa9_301616000.pb.lz4" + ); + decompress_fixture(data) +} From fa5a3e028a431eb0872c7047458867a7e4ddd595 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 10 Jun 2024 20:52:34 -0700 Subject: [PATCH 4/7] error trace --- rust/Cargo.lock | 8 +- rust/transaction-filter/Cargo.toml | 1 - rust/transaction-filter/src/api_filter.rs | 8 +- rust/transaction-filter/src/errors.rs | 81 +++++++++++++++++++ .../transaction-filter/src/filter_operator.rs | 13 ++- rust/transaction-filter/src/filters/event.rs | 11 +-- .../src/filters/move_module.rs | 14 ++-- .../src/filters/positional.rs | 14 ++-- .../src/filters/transaction_root.rs | 8 +- .../src/filters/user_transaction_request.rs | 18 ++--- .../src/filters/write_set_change_filter.rs | 27 ++++--- rust/transaction-filter/src/json_search.rs | 7 +- rust/transaction-filter/src/lib.rs | 1 + rust/transaction-filter/src/traits.rs | 60 +++++++++----- 14 files changed, 182 insertions(+), 89 deletions(-) create mode 100644 rust/transaction-filter/src/errors.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 57990266f..24a3972b8 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2828,18 +2828,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.193" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.193" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", diff --git a/rust/transaction-filter/Cargo.toml b/rust/transaction-filter/Cargo.toml index 33d72e3e8..b08215c52 100644 --- a/rust/transaction-filter/Cargo.toml +++ b/rust/transaction-filter/Cargo.toml @@ -15,7 +15,6 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } aptos-protos = { workspace = true } - # SIMD for string search. TODO: benchmark this on various real inputs to see if it's worth it memchr = { workspace = true } diff --git a/rust/transaction-filter/src/api_filter.rs b/rust/transaction-filter/src/api_filter.rs index 5cb94caac..90cbc0cce 100644 --- a/rust/transaction-filter/src/api_filter.rs +++ b/rust/transaction-filter/src/api_filter.rs @@ -1,11 +1,11 @@ // use crate::traits::Filterable; use crate::{ + errors::FilterError, filters::{ EventFilter, TransactionRootFilter, UserTransactionRequestFilter, WriteSetChangeFilter, }, traits::Filterable, }; -use anyhow::Error; use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -26,13 +26,15 @@ pub struct PublicOrApiFilter { } impl Filterable for PublicOrApiFilter { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.root_filter.is_none() && self.user_transaction_filter.is_none() && self.event_filter.is_none() && self.write_set_change_filter.is_none() { - return Err(Error::msg("At least one of root_filter, user_transaction_filter, event_filter or write_set_change_filter must be set")); + return Err(anyhow::anyhow!( + "At least one of root_filter, user_transaction_filter, event_filter, or write_set_change_filter must be set" + ).into()); }; self.root_filter.is_valid()?; diff --git a/rust/transaction-filter/src/errors.rs b/rust/transaction-filter/src/errors.rs new file mode 100644 index 000000000..a6289cf40 --- /dev/null +++ b/rust/transaction-filter/src/errors.rs @@ -0,0 +1,81 @@ +use serde::{Serialize, Serializer}; +use std::fmt::Display; +use thiserror::Error as ThisError; + +#[derive(Debug, Serialize)] +pub struct FilterStepTrace { + pub serialized_filter: String, + pub filter_type: String, +} + +impl Display for FilterStepTrace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}: {}", self.filter_type, self.serialized_filter) + } +} +#[derive(Debug)] +pub struct SerializableError { + pub inner: Box, +} + +/// Custom error that allows for keeping track of the filter type/path that caused the error +#[derive(Debug, Serialize, ThisError)] +pub struct FilterError { + pub filter_path: Vec, + pub error: SerializableError, +} + +impl FilterError { + pub fn new(error: Box) -> Self { + Self { + filter_path: Vec::new(), + error: SerializableError::new(error), + } + } + + pub fn add_trace(&mut self, serialized_filter: String, filter_type: String) { + self.filter_path.push(FilterStepTrace { + serialized_filter, + filter_type, + }); + } +} + +impl From for FilterError { + fn from(error: anyhow::Error) -> Self { + Self::new(error.into()) + } +} + +impl Display for FilterError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let trace_path = self + .filter_path + .iter() + .map(|trace| format!("{}", trace)) + .collect::>() + .join("\n"); + write!( + f, + "Filter Error: {:?}\nTrace Path:\n{}", + self.error.inner, trace_path + ) + } +} + +impl SerializableError { + fn new(error: Box) -> Self { + SerializableError { inner: error } + } +} + +// Implement Serialize for the wrapper +impl Serialize for SerializableError { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // Serialize the error as its string representation + serializer.serialize_str(&self.inner.to_string()) + } +} diff --git a/rust/transaction-filter/src/filter_operator.rs b/rust/transaction-filter/src/filter_operator.rs index 4d5d875a8..7f41d90ca 100644 --- a/rust/transaction-filter/src/filter_operator.rs +++ b/rust/transaction-filter/src/filter_operator.rs @@ -1,11 +1,10 @@ -// use crate::traits::Filterable; use crate::{ + errors::FilterError, filters::{ EventFilter, TransactionRootFilter, UserTransactionRequestFilter, WriteSetChangeFilter, }, traits::Filterable, }; -use anyhow::Error; use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -22,7 +21,7 @@ pub enum APIFilter { } impl Filterable for APIFilter { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { match self { APIFilter::TransactionRootFilter(filter) => filter.is_valid(), APIFilter::UserTransactionRequestFilter(filter) => filter.is_valid(), @@ -97,7 +96,7 @@ impl FilterOperator { } impl Filterable for FilterOperator { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { match self { FilterOperator::And(and) => and.is_valid(), FilterOperator::Or(or) => or.is_valid(), @@ -122,7 +121,7 @@ pub struct LogicalAnd { } impl Filterable for LogicalAnd { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { for filter in &self.and { filter.is_valid()?; } @@ -140,7 +139,7 @@ pub struct LogicalOr { } impl Filterable for LogicalOr { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { for filter in &self.or { filter.is_valid()?; } @@ -158,7 +157,7 @@ pub struct LogicalNot { } impl Filterable for LogicalNot { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { for filter in &self.not { filter.is_valid()?; } diff --git a/rust/transaction-filter/src/filters/event.rs b/rust/transaction-filter/src/filters/event.rs index 6b7fc14a7..a5fa98ccb 100644 --- a/rust/transaction-filter/src/filters/event.rs +++ b/rust/transaction-filter/src/filters/event.rs @@ -1,4 +1,7 @@ -use crate::{filters::MoveStructTagFilter, json_search::JsonSearchTerm, traits::Filterable}; +use crate::{ + errors::FilterError, filters::MoveStructTagFilter, json_search::JsonSearchTerm, + traits::Filterable, +}; use anyhow::Error; use aptos_protos::transaction::v1::{move_type::Content, Event}; use serde::{Deserialize, Serialize}; @@ -15,11 +18,9 @@ pub struct EventFilter { impl Filterable for EventFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.data.is_none() && self.struct_type.is_none() { - return Err(Error::msg( - "At least one of data or struct_type must be set", - )); + return Err(Error::msg("At least one of data or struct_type must be set").into()); }; self.data.is_valid()?; diff --git a/rust/transaction-filter/src/filters/move_module.rs b/rust/transaction-filter/src/filters/move_module.rs index e78f084a7..2e23eeb2c 100644 --- a/rust/transaction-filter/src/filters/move_module.rs +++ b/rust/transaction-filter/src/filters/move_module.rs @@ -1,5 +1,5 @@ -use crate::traits::Filterable; -use anyhow::{anyhow, Error}; +use crate::{errors::FilterError, traits::Filterable}; +use anyhow::anyhow; use aptos_protos::transaction::v1::{MoveModuleId, MoveStructTag}; use serde::{Deserialize, Serialize}; @@ -14,9 +14,9 @@ pub struct MoveModuleFilter { impl Filterable for MoveModuleFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.address.is_none() && self.name.is_none() { - return Err(anyhow!("At least one of address or name must be set")); + return Err(anyhow!("At least one of address or name must be set").into()); }; Ok(()) } @@ -40,11 +40,9 @@ pub struct MoveStructTagFilter { impl Filterable for MoveStructTagFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.address.is_none() && self.module.is_none() && self.name.is_none() { - return Err(anyhow!( - "At least one of address, module or name must be set" - )); + return Err(anyhow!("At least one of address, module or name must be set").into()); }; Ok(()) } diff --git a/rust/transaction-filter/src/filters/positional.rs b/rust/transaction-filter/src/filters/positional.rs index 296085131..304a962d7 100644 --- a/rust/transaction-filter/src/filters/positional.rs +++ b/rust/transaction-filter/src/filters/positional.rs @@ -1,4 +1,4 @@ -use crate::traits::Filterable; +use crate::{errors::FilterError, traits::Filterable}; use anyhow::Error; use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -16,9 +16,9 @@ where impl Filterable> for PositionalFilter where - T: PartialEq + Debug, + T: PartialEq + Debug + Serialize, { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { Ok(()) } @@ -29,13 +29,11 @@ where impl Filterable> for Vec> where - T: PartialEq + Debug, + T: PartialEq + Debug + Serialize, { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.is_empty() { - return Err(Error::msg( - "PositionalFilter must have at least one element", - )); + return Err(Error::msg("PositionalFilter must have at least one element").into()); } Ok(()) } diff --git a/rust/transaction-filter/src/filters/transaction_root.rs b/rust/transaction-filter/src/filters/transaction_root.rs index 9e7900050..bb81556e7 100644 --- a/rust/transaction-filter/src/filters/transaction_root.rs +++ b/rust/transaction-filter/src/filters/transaction_root.rs @@ -1,4 +1,4 @@ -use crate::traits::Filterable; +use crate::{errors::FilterError, traits::Filterable}; use anyhow::Error; use aptos_protos::transaction::v1::{transaction::TransactionType, Transaction}; use serde::{Deserialize, Serialize}; @@ -14,11 +14,9 @@ pub struct TransactionRootFilter { impl Filterable for TransactionRootFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.success.is_none() && self.txn_type.is_none() { - return Err(Error::msg( - "At least one of success or txn_types must be set", - )); + return Err(Error::msg("At least one of success or txn_types must be set").into()); }; Ok(()) } diff --git a/rust/transaction-filter/src/filters/user_transaction_request.rs b/rust/transaction-filter/src/filters/user_transaction_request.rs index a32851495..f3b9b2f9f 100644 --- a/rust/transaction-filter/src/filters/user_transaction_request.rs +++ b/rust/transaction-filter/src/filters/user_transaction_request.rs @@ -1,4 +1,4 @@ -use crate::{filters::PositionalFilter, traits::Filterable}; +use crate::{errors::FilterError, filters::PositionalFilter, traits::Filterable}; use anyhow::{anyhow, Error}; use aptos_protos::transaction::v1::{ multisig_transaction_payload, transaction_payload, EntryFunctionId, EntryFunctionPayload, @@ -19,9 +19,9 @@ pub struct UserTransactionRequestFilter { impl Filterable for UserTransactionRequestFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.sender.is_none() && self.payload.is_none() { - return Err(Error::msg("At least one of sender or payload must be set")); + return Err(Error::msg("At least one of sender or payload must be set").into()); }; self.payload.is_valid()?; Ok(()) @@ -66,11 +66,9 @@ pub struct EntryFunctionFilter { impl Filterable for EntryFunctionFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.address.is_none() && self.module.is_none() && self.function.is_none() { - return Err(anyhow!( - "At least one of address, name or function must be set" - )); + return Err(anyhow!("At least one of address, name or function must be set").into()); }; Ok(()) } @@ -109,11 +107,9 @@ pub struct UserTransactionPayloadFilter { impl Filterable for UserTransactionPayloadFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.function.is_none() && self.arguments.is_none() { - return Err(Error::msg( - "At least one of function or arguments must be set", - )); + return Err(Error::msg("At least one of function or arguments must be set").into()); }; self.function.is_valid()?; self.arguments.is_valid()?; diff --git a/rust/transaction-filter/src/filters/write_set_change_filter.rs b/rust/transaction-filter/src/filters/write_set_change_filter.rs index 11dccb293..a43eace7d 100644 --- a/rust/transaction-filter/src/filters/write_set_change_filter.rs +++ b/rust/transaction-filter/src/filters/write_set_change_filter.rs @@ -1,4 +1,5 @@ use crate::{ + errors::FilterError, filters::MoveStructTagFilter, json_search::{JsonOrStringSearch, JsonSearchTerm}, traits::Filterable, @@ -24,9 +25,9 @@ pub struct WriteSetChangeFilter { impl Filterable for WriteSetChangeFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.change.is_none() { - return Err(Error::msg("field change must be set")); + return Err(Error::msg("field change must be set").into()); }; self.change.is_valid()?; Ok(()) @@ -100,7 +101,7 @@ pub enum ChangeItemFilter { impl Filterable for ChangeItemFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { match self { ChangeItemFilter::ResourceChange(rcf) => rcf.is_valid(), ChangeItemFilter::ModuleChange(mcf) => mcf.is_valid(), @@ -171,11 +172,11 @@ pub enum ResourceChange<'a> { impl Filterable> for ResourceChangeFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.resource_type.is_none() && self.address.is_none() { - return Err(Error::msg( - "At least one of resource_type, address, or data must be set", - )); + return Err( + Error::msg("At least one of resource_type, address, or data must be set").into(), + ); }; self.resource_type.is_valid()?; self.data.is_valid()?; @@ -235,9 +236,9 @@ pub enum ModuleChange<'a> { impl Filterable> for ModuleChangeFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.address.is_none() { - return Err(Error::msg("At least one of address must be set")); + return Err(Error::msg("At least one of address must be set").into()); }; Ok(()) } @@ -268,11 +269,11 @@ pub enum TableChange<'a> { } impl Filterable> for TableChangeFilter { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { if self.handle.is_none() && self.key.is_none() && self.key_type_str.is_none() { - return Err(Error::msg( - "At least one of handle, key, or key_type_str must be set", - )); + return Err( + Error::msg("At least one of handle, key, or key_type_str must be set").into(), + ); }; Ok(()) } diff --git a/rust/transaction-filter/src/json_search.rs b/rust/transaction-filter/src/json_search.rs index 54e538698..fdb0faf5d 100644 --- a/rust/transaction-filter/src/json_search.rs +++ b/rust/transaction-filter/src/json_search.rs @@ -1,5 +1,4 @@ -use crate::traits::Filterable; -use anyhow::Error; +use crate::{errors::FilterError, traits::Filterable}; use memchr::memmem::Finder; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -157,7 +156,7 @@ impl JsonSearchTerm { } impl Filterable for JsonSearchTerm { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { // Validation is performed elsewhere Ok(()) } @@ -176,7 +175,7 @@ pub enum JsonOrStringSearch { } impl Filterable for JsonOrStringSearch { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { match self { JsonOrStringSearch::Json(json) => json.is_valid(), JsonOrStringSearch::String(_) => Ok(()), diff --git a/rust/transaction-filter/src/lib.rs b/rust/transaction-filter/src/lib.rs index 1f7766fe7..aa8a4cbb9 100644 --- a/rust/transaction-filter/src/lib.rs +++ b/rust/transaction-filter/src/lib.rs @@ -4,6 +4,7 @@ pub mod filters; pub mod json_search; pub mod traits; +mod errors; #[cfg(test)] pub mod test_lib; diff --git a/rust/transaction-filter/src/traits.rs b/rust/transaction-filter/src/traits.rs index f6426ae57..1fa0d8c96 100644 --- a/rust/transaction-filter/src/traits.rs +++ b/rust/transaction-filter/src/traits.rs @@ -1,19 +1,38 @@ -use anyhow::Error; +use crate::errors::FilterError; +use serde::Serialize; +use std::fmt::Debug; /// Simple trait to allow for filtering of items of type T -pub trait Filterable { +pub trait Filterable +where + Self: Debug + Serialize, +{ /// Whether this filter is correctly configured/initialized /// Any call to `validate_state` is responsible for recursively checking the validity of any nested filters *by calling `is_valid`* /// The actual public API is via `is_valid` which will call `validate_state` and return an error if it fails, but annotated with the filter type/path - fn validate_state(&self) -> Result<(), Error>; - + fn validate_state(&self) -> Result<(), FilterError>; + + /** + * This is a convenience method to allow for the error to be annotated with the filter type/path at each level + * This is the public API for checking the validity of a filter! + * Example output looks like: + * ```text + * FilterError: This is a test error!. + * Trace Path: + * transaction_filter::traits::test::InnerStruct: {"a":"test"} + * core::option::Option: {"a":"test"} + * transaction_filter::traits::test::OuterStruct: {"inner":{"a":"test"}} + * ``` + **/ #[inline] - fn is_valid(&self) -> Result<(), Error> { - println!("calling: {}", std::any::type_name::()); - // This is a convenience method to allow for the error to be annotated with the filter type/path - self.validate_state().map_err(|e| { - println!("erroring: {}", std::any::type_name::()); - e.context(std::any::type_name::()) + fn is_valid(&self) -> Result<(), FilterError> { + // T + self.validate_state().map_err(|mut e| { + e.add_trace( + serde_json::to_string(self).unwrap(), + std::any::type_name::().to_string(), + ); + e }) } @@ -47,7 +66,7 @@ where F: Filterable, { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { match self { Some(filter) => filter.is_valid(), None => Ok(()), @@ -73,7 +92,7 @@ where impl Filterable for Option { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { Ok(()) } @@ -88,7 +107,7 @@ impl Filterable for Option { impl Filterable for Option { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { Ok(()) } @@ -103,7 +122,7 @@ impl Filterable for Option { impl Filterable for Option { #[inline] - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { Ok(()) } @@ -121,14 +140,14 @@ mod test { use super::*; use anyhow::anyhow; - #[derive(Debug, PartialEq)] + #[derive(Debug, Serialize, PartialEq)] pub struct InnerStruct { pub a: Option, } impl Filterable for InnerStruct { - fn validate_state(&self) -> Result<(), Error> { - Err(anyhow!("this is an error")) + fn validate_state(&self) -> Result<(), FilterError> { + Err(anyhow!("This is a test error!").into()) } fn is_allowed(&self, _item: &InnerStruct) -> bool { @@ -136,13 +155,13 @@ mod test { } } - #[derive(Debug, PartialEq)] + #[derive(Debug, PartialEq, Serialize)] pub struct OuterStruct { pub inner: Option, } impl Filterable for OuterStruct { - fn validate_state(&self) -> Result<(), Error> { + fn validate_state(&self) -> Result<(), FilterError> { self.inner.is_valid()?; Ok(()) } @@ -161,6 +180,7 @@ mod test { let res = outer.is_valid(); assert!(res.is_err()); - assert_eq!(res.unwrap_err().to_string(), "err"); + let error = res.unwrap_err(); + assert_eq!(error.to_string(), "Filter Error: This is a test error!\nTrace Path:\ntransaction_filter::traits::test::InnerStruct: {\"a\":\"test\"}\ncore::option::Option: {\"a\":\"test\"}\ntransaction_filter::traits::test::OuterStruct: {\"inner\":{\"a\":\"test\"}}"); } } From f2be1db8050672d19ecffe6c6a4e5f017f03b79f Mon Sep 17 00:00:00 2001 From: CapCap Date: Tue, 11 Jun 2024 11:15:10 -0700 Subject: [PATCH 5/7] remove api filter --- rust/transaction-filter/src/api_filter.rs | 231 ------------------ .../transaction-filter/src/filter_operator.rs | 15 +- .../src/filters/user_transaction_request.rs | 22 +- rust/transaction-filter/src/lib.rs | 1 - 4 files changed, 17 insertions(+), 252 deletions(-) delete mode 100644 rust/transaction-filter/src/api_filter.rs diff --git a/rust/transaction-filter/src/api_filter.rs b/rust/transaction-filter/src/api_filter.rs deleted file mode 100644 index 90cbc0cce..000000000 --- a/rust/transaction-filter/src/api_filter.rs +++ /dev/null @@ -1,231 +0,0 @@ -// use crate::traits::Filterable; -use crate::{ - errors::FilterError, - filters::{ - EventFilter, TransactionRootFilter, UserTransactionRequestFilter, WriteSetChangeFilter, - }, - traits::Filterable, -}; -use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; -use serde::{Deserialize, Serialize}; -use std::fmt::Debug; - -/// These are filters we would expect to be exposed via API -/// If any of these filters match, the transaction returns true -#[derive(Debug, Deserialize, PartialEq, Serialize)] -#[serde(deny_unknown_fields)] -pub struct PublicOrApiFilter { - #[serde(skip_serializing_if = "Option::is_none")] - pub root_filter: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub user_transaction_filter: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub event_filter: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub write_set_change_filter: Option>, -} - -impl Filterable for PublicOrApiFilter { - fn validate_state(&self) -> Result<(), FilterError> { - if self.root_filter.is_none() - && self.user_transaction_filter.is_none() - && self.event_filter.is_none() - && self.write_set_change_filter.is_none() - { - return Err(anyhow::anyhow!( - "At least one of root_filter, user_transaction_filter, event_filter, or write_set_change_filter must be set" - ).into()); - }; - - self.root_filter.is_valid()?; - self.user_transaction_filter.is_valid()?; - if let Some(event_filters) = &self.event_filter { - for event_filter in event_filters { - event_filter.is_valid()?; - } - } - if let Some(write_set_change_filters) = &self.write_set_change_filter { - for write_set_change_filter in write_set_change_filters { - write_set_change_filter.is_valid()?; - } - } - - Ok(()) - } - - fn is_allowed(&self, txn: &Transaction) -> bool { - if self.root_filter.is_allowed(txn) { - return true; - } - - if let Some(ut_filter) = &self.user_transaction_filter { - let txn_filter_res = txn.txn_data.as_ref().map(|txn_data| { - if let TxnData::User(u) = txn_data { - u.request - .as_ref() - .map(|req| ut_filter.is_allowed(req)) - .unwrap_or(false) - } else { - false - } - }); - - if let Some(txn_filter_res) = txn_filter_res { - if txn_filter_res { - return true; - } - } - } - - if let Some(events_filter) = &self.event_filter { - if let Some(txn_data) = &txn.txn_data { - let events = match txn_data { - TxnData::BlockMetadata(bm) => Some(&bm.events), - TxnData::Genesis(g) => Some(&g.events), - TxnData::StateCheckpoint(_) => None, - TxnData::User(u) => Some(&u.events), - TxnData::Validator(_) => None, - }; - if let Some(events) = events { - for event_filter in events_filter { - if event_filter.is_allowed_vec(events) { - return true; - } - } - } - } - } - - if let Some(changes_filter) = &self.write_set_change_filter { - let changes = &txn.info.as_ref().map(|inf| &inf.changes); - for change_filter in changes_filter { - if change_filter.is_allowed_opt_vec(changes) { - return true; - } - } - } - - false - } -} - -#[cfg(test)] -mod test { - use super::*; - use crate::{ - filters::{ - user_transaction_request::EntryFunctionFilter, - write_set_change_filter::{ - ChangeItemFilter, ModuleChangeFilter, ResourceChangeFilter, TableChangeFilter, - }, - MoveStructTagFilter, PositionalFilter, UserTransactionPayloadFilter, - }, - json_search::{JsonOrStringSearch, JsonSearchTerm}, - test_lib::{load_graffio_fixture, load_random_april_3mb_fixture, load_taptos_fixture}, - }; - use aptos_protos::indexer::v1::TransactionsInStorage; - - #[test] - pub fn test_query_parsing() { - let trf = TransactionRootFilter { - success: Some(true), - txn_type: Some(aptos_protos::transaction::v1::transaction::TransactionType::User), - }; - - let utrf = UserTransactionRequestFilter { - sender: Some("0x0011".into()), - payload: Some(UserTransactionPayloadFilter { - function: Some(EntryFunctionFilter { - address: Some("0x001".into()), - module: Some("module".into()), - function: Some("F".into()), - }), - arguments: Some(vec![PositionalFilter { - index: 0, - value: "0x0011".into(), - }]), - }), - }; - - let ef1 = EventFilter { - data: Some(JsonSearchTerm::new("spins".into(), 5.into()).unwrap()), - struct_type: Some(MoveStructTagFilter { - address: Some("0x0077".into()), - module: Some("roulette".into()), - name: Some("spin".into()), - }), - }; - - let ef2 = EventFilter { - data: Some(JsonSearchTerm::new("debt".into(), 12.into()).unwrap()), - struct_type: Some(MoveStructTagFilter { - address: Some("0x0052".into()), - module: Some("lending".into()), - name: Some("borrow_to_gamble".into()), - }), - }; - - let wscf_res = WriteSetChangeFilter { - change: Some(ChangeItemFilter::ResourceChange(ResourceChangeFilter { - resource_type: Some(MoveStructTagFilter { - address: Some("0x001af32".into()), - module: Some("airport".into()), - name: Some("airplane".into()), - }), - address: Some("0x001af32".into()), - data: Some(JsonSearchTerm::new("takeoff".into(), true.into()).unwrap()), - })), - }; - let wscf_table = WriteSetChangeFilter { - change: Some(ChangeItemFilter::TableChange(TableChangeFilter { - handle: Some("0x796857465434253644536475453432453".into()), - key: Some(JsonOrStringSearch::String("table_key".into())), - key_type_str: Some("0x423453466345::some_module::SomeStruct".into()), - })), - }; - let wscf_mod = WriteSetChangeFilter { - change: Some(ChangeItemFilter::ModuleChange(ModuleChangeFilter { - address: Some("0x0000098".into()), - })), - }; - - let query = PublicOrApiFilter { - root_filter: Some(trf), - user_transaction_filter: Some(utrf), - event_filter: Some(vec![ef1, ef2]), - write_set_change_filter: Some(vec![wscf_res, wscf_table, wscf_mod]), - }; - - let tapos_txns = load_taptos_fixture(); - let random3mb_txns = load_random_april_3mb_fixture(); - let graffio_txns = load_graffio_fixture(); - - test_filter(&query, &tapos_txns, "graffio"); - test_filter(&query, &random3mb_txns, "random3mb"); - test_filter(&query, &graffio_txns, "tapos"); - } - - fn test_filter(query: &PublicOrApiFilter, txns: &TransactionsInStorage, set_name: &str) { - println!( - "SET {}:> Json Query Representation: \n {}", - set_name, - serde_json::to_string_pretty(query).unwrap() - ); - const LOOPS: usize = 1000; - let start = std::time::Instant::now(); - for _ in 0..LOOPS { - for txn in &txns.transactions { - query.is_allowed(txn); - } - } - let elapsed = start.elapsed(); - let total_txn = LOOPS * txns.transactions.len(); - println!( - "BENCH SET {}:> Took {:?} for {} transactions ({:?} each)\n\n", - set_name, - elapsed, - total_txn, - elapsed / total_txn as u32 - ); - } -} diff --git a/rust/transaction-filter/src/filter_operator.rs b/rust/transaction-filter/src/filter_operator.rs index 7f41d90ca..e15b8cbc8 100644 --- a/rust/transaction-filter/src/filter_operator.rs +++ b/rust/transaction-filter/src/filter_operator.rs @@ -33,20 +33,7 @@ impl Filterable for APIFilter { fn is_allowed(&self, txn: &Transaction) -> bool { match self { APIFilter::TransactionRootFilter(filter) => filter.is_allowed(txn), - APIFilter::UserTransactionRequestFilter(ut_filter) => txn - .txn_data - .as_ref() - .map(|txn_data| { - if let TxnData::User(u) = txn_data { - u.request - .as_ref() - .map(|req| ut_filter.is_allowed(req)) - .unwrap_or(false) - } else { - false - } - }) - .unwrap_or(false), + APIFilter::UserTransactionRequestFilter(ut_filter) => ut_filter.is_allowed(&txn), APIFilter::EventFilter(events_filter) => { if let Some(txn_data) = &txn.txn_data { let events = match txn_data { diff --git a/rust/transaction-filter/src/filters/user_transaction_request.rs b/rust/transaction-filter/src/filters/user_transaction_request.rs index f3b9b2f9f..a4bf36f64 100644 --- a/rust/transaction-filter/src/filters/user_transaction_request.rs +++ b/rust/transaction-filter/src/filters/user_transaction_request.rs @@ -1,8 +1,8 @@ use crate::{errors::FilterError, filters::PositionalFilter, traits::Filterable}; use anyhow::{anyhow, Error}; use aptos_protos::transaction::v1::{ - multisig_transaction_payload, transaction_payload, EntryFunctionId, EntryFunctionPayload, - TransactionPayload, UserTransactionRequest, + multisig_transaction_payload, transaction::TxnData, transaction_payload, EntryFunctionId, + EntryFunctionPayload, Transaction, TransactionPayload, }; use serde::{Deserialize, Serialize}; @@ -17,7 +17,7 @@ pub struct UserTransactionRequestFilter { pub payload: Option, } -impl Filterable for UserTransactionRequestFilter { +impl Filterable for UserTransactionRequestFilter { #[inline] fn validate_state(&self) -> Result<(), FilterError> { if self.sender.is_none() && self.payload.is_none() { @@ -28,16 +28,26 @@ impl Filterable for UserTransactionRequestFilter { } #[inline] - fn is_allowed(&self, item: &UserTransactionRequest) -> bool { + fn is_allowed(&self, txn: &Transaction) -> bool { + let user_request = if let Some(TxnData::User(u)) = txn.txn_data.as_ref() { + if let Some(user_request) = u.request.as_ref() { + user_request + } else { + return false; + } + } else { + return false; + }; + if let Some(sender_filter) = &self.sender { - if &item.sender != sender_filter { + if &user_request.sender != sender_filter { return false; } } if let Some(payload_filter) = &self.payload { // Get the entry_function_payload from both UserPayload and MultisigPayload - let entry_function_payload = item + let entry_function_payload = user_request .payload .as_ref() .and_then(get_entry_function_payload_from_transaction_payload); diff --git a/rust/transaction-filter/src/lib.rs b/rust/transaction-filter/src/lib.rs index aa8a4cbb9..a3a69e679 100644 --- a/rust/transaction-filter/src/lib.rs +++ b/rust/transaction-filter/src/lib.rs @@ -1,4 +1,3 @@ -pub mod api_filter; pub mod filter_operator; pub mod filters; pub mod json_search; From 34188cc7d22da8fd8d0eb89dc25b384a29b3ba8b Mon Sep 17 00:00:00 2001 From: CapCap Date: Tue, 11 Jun 2024 11:20:14 -0700 Subject: [PATCH 6/7] docs --- rust/transaction-filter/src/traits.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rust/transaction-filter/src/traits.rs b/rust/transaction-filter/src/traits.rs index 1fa0d8c96..9a5a402e9 100644 --- a/rust/transaction-filter/src/traits.rs +++ b/rust/transaction-filter/src/traits.rs @@ -36,6 +36,11 @@ where }) } + /// Whether the item is allowed by this filter + /// This is the core method that should be implemented by any filter + /// This is the method that should be called by any parent filter to determine if an item is allowed + /// *If a filter doesn't explicitly prevent an item, then it should be allowed* + /// This forces the logic of `if !child_filter.is_allowed(item) { return false; }` for any parent filter fn is_allowed(&self, item: &T) -> bool; #[inline] From c698d1c411f3e56c8e65f524599d6679b7e8a188 Mon Sep 17 00:00:00 2001 From: CapCap Date: Tue, 11 Jun 2024 11:20:59 -0700 Subject: [PATCH 7/7] lint --- rust/transaction-filter/src/filter_operator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/transaction-filter/src/filter_operator.rs b/rust/transaction-filter/src/filter_operator.rs index e15b8cbc8..a54f41ca5 100644 --- a/rust/transaction-filter/src/filter_operator.rs +++ b/rust/transaction-filter/src/filter_operator.rs @@ -33,7 +33,7 @@ impl Filterable for APIFilter { fn is_allowed(&self, txn: &Transaction) -> bool { match self { APIFilter::TransactionRootFilter(filter) => filter.is_allowed(txn), - APIFilter::UserTransactionRequestFilter(ut_filter) => ut_filter.is_allowed(&txn), + APIFilter::UserTransactionRequestFilter(ut_filter) => ut_filter.is_allowed(txn), APIFilter::EventFilter(events_filter) => { if let Some(txn_data) = &txn.txn_data { let events = match txn_data {