Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] More Advanced Transaction Filtering #389

Merged
merged 7 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ jemallocator = { version = "0.5.0", features = [
] }
kanal = { version = "0.1.0-pre8", features = ["async"] }
once_cell = "1.10.0"
# SIMD for string search
memchr = "2.7.2"
num_cpus = "1.16.0"
pbjson = "0.5.1"
prometheus = { version = "0.13.0", default-features = false }
Expand Down
5 changes: 4 additions & 1 deletion rust/processor/src/utils/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ pub struct MultisigPayloadClean {
}

/// Standardizes all addresses and table handles to be length 66 (0x-64 length hash)
#[inline]
pub fn standardize_address(handle: &str) -> String {
if let Some(handle) = handle.strip_prefix("0x") {
if handle.len() == 66 {
handle.to_string()
} else if let Some(handle) = handle.strip_prefix("0x") {
format!("0x{:0>64}", handle)
} else {
format!("0x{:0>64}", handle)
Expand Down
3 changes: 3 additions & 0 deletions rust/transaction-filter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ rust-version = { workspace = true }
anyhow = { workspace = true }
aptos-protos = { workspace = true }

# SIMD for string search. TODO: benchmark this on various real inputs to see if it's worth it
memchr = { workspace = true }

prost = { workspace = true }

serde = { workspace = true }
Expand Down
334 changes: 334 additions & 0 deletions rust/transaction-filter/src/filter_operator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
// use crate::traits::Filterable;
use crate::{
filters::{
EventFilter, TransactionRootFilter, UserTransactionRequestFilter, WriteSetChangeFilter,
},
traits::Filterable,
};
use anyhow::Error;
use aptos_protos::transaction::v1::{transaction::TxnData, Transaction};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

/// These are filters we would expect to be exposed via API
#[derive(Debug, Deserialize, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(tag = "type")]
pub enum APIFilter {
TransactionRootFilter(TransactionRootFilter),
UserTransactionRequestFilter(UserTransactionRequestFilter),
EventFilter(EventFilter),
WriteSetChangeFilter(WriteSetChangeFilter),
}

impl Filterable<Transaction> for APIFilter {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use enum_dispatch / ambassador to help eliminate some of the boilerplate here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum dispatchhhhh

fn is_valid(&self) -> Result<(), Error> {
match self {
APIFilter::TransactionRootFilter(filter) => filter.is_valid(),
APIFilter::UserTransactionRequestFilter(filter) => filter.is_valid(),
APIFilter::EventFilter(filter) => filter.is_valid(),
APIFilter::WriteSetChangeFilter(filter) => filter.is_valid(),
}
}

fn is_allowed(&self, txn: &Transaction) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't wanna nerd snipe you but it'd be neat if this function at the top level told you which filter allowed it if this is true 😛

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like if a transaction got filtered out, it'd be cool to see which filter exactly it failed on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh :-p

this is not technically hard, just a pain in the ass hahaha... Follow up PR?

match self {
APIFilter::TransactionRootFilter(filter) => filter.is_allowed(txn),
APIFilter::UserTransactionRequestFilter(ut_filter) => txn
.txn_data
.as_ref()
.map(|txn_data| {
if let TxnData::User(u) = txn_data {
u.request
.as_ref()
.map(|req| ut_filter.is_allowed(req))
.unwrap_or(false)
} else {
false
}
})
.unwrap_or(false),
CapCap marked this conversation as resolved.
Show resolved Hide resolved
APIFilter::EventFilter(events_filter) => {
if let Some(txn_data) = &txn.txn_data {
let events = match txn_data {
TxnData::BlockMetadata(bm) => &bm.events,
TxnData::Genesis(g) => &g.events,
TxnData::StateCheckpoint(_) => return false,
TxnData::User(u) => &u.events,
TxnData::Validator(_) => return false,
};
events_filter.is_allowed_vec(events)
} else {
false
}
},
APIFilter::WriteSetChangeFilter(changes_filter) => {
changes_filter.is_allowed_opt_vec(&txn.info.as_ref().map(|inf| &inf.changes))
},
}
}
}

#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum FilterOperator {
And(LogicalAnd),
Or(LogicalOr),
Filter(APIFilter),
}

impl FilterOperator {
pub fn new_and(and: Vec<FilterOperator>) -> Self {
FilterOperator::And(LogicalAnd { and })
}

pub fn new_or(or: Vec<FilterOperator>) -> Self {
FilterOperator::Or(LogicalOr { or })
}

pub fn new_filter(filter: APIFilter) -> Self {
FilterOperator::Filter(filter)
}
}

impl Filterable<Transaction> for FilterOperator {
fn is_valid(&self) -> Result<(), Error> {
match self {
FilterOperator::And(and) => and.is_valid(),
FilterOperator::Or(or) => or.is_valid(),
FilterOperator::Filter(filter) => filter.is_valid(),
}
}

fn is_allowed(&self, item: &Transaction) -> bool {
match self {
FilterOperator::And(and) => and.is_allowed(item),
FilterOperator::Or(or) => or.is_allowed(item),
FilterOperator::Filter(filter) => filter.is_allowed(item),
}
}
}

#[derive(Debug, Deserialize, Serialize)]
pub struct LogicalAnd {
and: Vec<FilterOperator>,
}

impl Filterable<Transaction> for LogicalAnd {
fn is_valid(&self) -> Result<(), Error> {
for filter in &self.and {
filter.is_valid()?;
}
Ok(())
}

fn is_allowed(&self, item: &Transaction) -> bool {
CapCap marked this conversation as resolved.
Show resolved Hide resolved
self.and.iter().all(|filter| filter.is_allowed(item))
}
}

#[derive(Debug, Deserialize, Serialize)]
pub struct LogicalOr {
or: Vec<FilterOperator>,
}

impl Filterable<Transaction> for LogicalOr {
fn is_valid(&self) -> Result<(), Error> {
for filter in &self.or {
filter.is_valid()?;
}
Ok(())
}

fn is_allowed(&self, item: &Transaction) -> bool {
self.or.iter().any(|filter| filter.is_allowed(item))
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::{
filters::{
user_transaction_request::EntryFunctionFilter,
write_set_change_filter::{
ChangeItemFilter, ModuleChangeFilter, ResourceChangeFilter, TableChangeFilter,
},
MoveStructTagFilter, PositionalFilter, UserTransactionPayloadFilter,
},
json_search::{JsonOrStringSearch, JsonSearchTerm},
};
use aptos_protos::indexer::v1::TransactionsInStorage;
use prost::Message;
use std::io::Read;

// Decompress fixtures first, Ex:

fn decompress_fixture(bytes: &[u8]) -> TransactionsInStorage {
let mut decompressor = lz4::Decoder::new(bytes).expect("Lz4 decompression failed.");
let mut decompressed = Vec::new();
decompressor
.read_to_end(&mut decompressed)
.expect("Lz4 decompression failed.");
TransactionsInStorage::decode(decompressed.as_slice()).expect("Failed to parse transaction")
}

#[allow(dead_code)]
fn load_taptos_fixture() -> TransactionsInStorage {
let data = include_bytes!(
"../fixtures/compressed_files_lz4_00008bc1d5adcf862d3967c1410001fb_705101000.pb.lz4"
);
decompress_fixture(data)
}

#[allow(dead_code)]
fn load_random_april_3mb_fixture() -> TransactionsInStorage {
let data = include_bytes!(
"../fixtures/compressed_files_lz4_0013c194ec4fdbfb8db7306170aac083_445907000.pb.lz4"
);
decompress_fixture(data)
}

#[allow(dead_code)]
fn load_graffio_fixture() -> TransactionsInStorage {
let data = include_bytes!(
"../fixtures/compressed_files_lz4_f3d880d9700c70d71fefe71aa9218aa9_301616000.pb.lz4"
);
decompress_fixture(data)
}

#[test]
pub fn test_query_parsing() {
let trf = TransactionRootFilter {
success: Some(true),
txn_type: Some(aptos_protos::transaction::v1::transaction::TransactionType::User),
};

let utrf = UserTransactionRequestFilter {
sender: Some("0x0011".into()),
payload: Some(UserTransactionPayloadFilter {
function: Some(EntryFunctionFilter {
address: Some("0x001".into()),
module: Some("module".into()),
function: Some("F".into()),
}),
arguments: Some(vec![PositionalFilter {
index: 0,
value: "0x0011".into(),
}]),
}),
};

let ef = EventFilter {
data: Some(JsonSearchTerm::new("spins".into(), 5.into()).unwrap()),
struct_type: Some(MoveStructTagFilter {
address: Some("0x0077".into()),
module: Some("roulette".into()),
name: Some("spin".into()),
}),
};

let wscf_res = WriteSetChangeFilter {
change_type: Some(aptos_protos::transaction::v1::write_set_change::Type::WriteResource),
change: Some(ChangeItemFilter::ResourceChange(ResourceChangeFilter {
resource_type: Some(MoveStructTagFilter {
address: Some("0x001af32".into()),
module: Some("airport".into()),
name: Some("airplane".into()),
}),
address: Some("0x001af32".into()),
data: Some(JsonSearchTerm::new("takeoff".into(), true.into()).unwrap()),
})),
};
let wscf_table = WriteSetChangeFilter {
change_type: Some(
aptos_protos::transaction::v1::write_set_change::Type::WriteTableItem,
),
change: Some(ChangeItemFilter::TableChange(TableChangeFilter {
handle: Some("0x796857465434253644536475453432453".into()),
key: Some(JsonOrStringSearch::String("table_key".into())),
key_type_str: Some("0x423453466345::some_module::SomeStruct".into()),
})),
};
let wscf_mod = WriteSetChangeFilter {
change_type: Some(aptos_protos::transaction::v1::write_set_change::Type::WriteModule),
change: Some(ChangeItemFilter::ModuleChange(ModuleChangeFilter {
address: Some("0x0000098".into()),
})),
};

let write_set_ors = FilterOperator::new_or(vec![
FilterOperator::Filter(APIFilter::WriteSetChangeFilter(wscf_res)),
FilterOperator::Filter(APIFilter::WriteSetChangeFilter(wscf_table)),
FilterOperator::Filter(APIFilter::WriteSetChangeFilter(wscf_mod)),
]);

let event_filter_or_write_set = FilterOperator::new_or(vec![
FilterOperator::Filter(APIFilter::EventFilter(ef)),
write_set_ors,
]);

let transaction_root_and_request_filter = FilterOperator::new_or(vec![
FilterOperator::Filter(APIFilter::TransactionRootFilter(trf)),
FilterOperator::Filter(APIFilter::UserTransactionRequestFilter(utrf)),
]);

let query = FilterOperator::new_or(vec![
transaction_root_and_request_filter,
event_filter_or_write_set,
]);

println!(
"JSON RESULT: \n {}",
serde_json::to_string_pretty(&query).unwrap()
);

let txns = load_graffio_fixture();

// Benchmark how long it takes to do this 100 times
let start = std::time::Instant::now();
const LOOPS: i32 = 1000;
for _ in 0..LOOPS {
for txn in &txns.transactions {
query.is_allowed(txn);
}
}
let elapsed = start.elapsed();

let total_txn = LOOPS * txns.transactions.len() as i32;
println!(
"BENCH: Took {:?} for {} transactions ({:?} each)",
elapsed,
total_txn,
elapsed / total_txn as u32
);

let ef_econia = EventFilter {
data: None,
struct_type: Some(MoveStructTagFilter {
address: Some("0x00ECONIA".into()),
module: None,
name: None,
}),
};
let ef_aries = EventFilter {
data: None,
struct_type: Some(MoveStructTagFilter {
address: Some("0x00ARIES".into()),
module: None,
name: None,
}),
};
let query = FilterOperator::new_or(vec![
FilterOperator::Filter(APIFilter::EventFilter(ef_econia)),
FilterOperator::Filter(APIFilter::EventFilter(ef_aries)),
]);
println!(
"JSON RESULT: \n {}",
serde_json::to_string_pretty(&query).unwrap()
);

//println!("Filter result for u32: {}", filter.is_allowed(&item_u32)); // true
//println!("Filter result for String: {}", filter.is_allowed(&item_s)); // false
}
}
Loading
Loading