From 214f867f3f3f86de360591ec75f2fb84b097825d Mon Sep 17 00:00:00 2001 From: tgmichel Date: Fri, 6 Nov 2020 12:37:14 +0100 Subject: [PATCH] Support topic wildcards and conditional indexed parameters (#182) * VariadicValue wildcard and nesting support * Add FilteredParams type implementation * Public re-export FilteredParams * Use FilteredParams in EthApi * Fix checker --- rpc/core/src/types/filter.rs | 225 ++++++++++++++++++++++++++++++++- rpc/core/src/types/mod.rs | 2 +- rpc/src/eth.rs | 50 ++++---- rpc/src/eth_pubsub.rs | 233 +++++------------------------------ 4 files changed, 281 insertions(+), 229 deletions(-) diff --git a/rpc/core/src/types/filter.rs b/rpc/core/src/types/filter.rs index 3c372660c..9f7e3bf10 100644 --- a/rpc/core/src/types/filter.rs +++ b/rpc/core/src/types/filter.rs @@ -49,8 +49,12 @@ impl<'a, T> Deserialize<'a> for VariadicValue where T: DeserializeOwned { /// Filter Address pub type FilterAddress = VariadicValue; -/// Topic -pub type Topic = VariadicValue; +/// Topic, supports `A` | `null` | `[A,B,C]` | `[A,[B,C]]` | [null,[B,C]] | [null,[null,C]] +pub type Topic = VariadicValue> +>>; +/// FlatTopic, simplifies the matching logic. +pub type FlatTopic = VariadicValue>; /// Filter #[derive(Debug, PartialEq, Clone, Deserialize, Eq, Hash)] @@ -69,6 +73,223 @@ pub struct Filter { pub topics: Option, } +/// Helper for Filter matching. +/// Supports conditional indexed parameters and wildcards. +#[derive(Debug)] +pub struct FilteredParams { + pub filter: Option, + flat_topics: Vec, +} + +impl Default for FilteredParams { + fn default() -> Self { + FilteredParams { + filter: None, + flat_topics: Vec::new() + } + } +} + +impl FilteredParams { + pub fn new( + f: Option, + ) -> Self { + if let Some(f) = f { + return FilteredParams { + filter: Some(f.clone()), + flat_topics: { + if let Some(t) = f.clone().topics { + Self::flatten(&t) + } else { Vec:: new() } + } + }; + } + Self::default() + } + /// Cartesian product for VariadicValue conditional indexed parameters. + /// Executed once on struct instance. + /// i.e. `[A,[B,C]]` to `[[A,B],[A,C]]`. + fn flatten(topic: &Topic) -> Vec { + fn cartesian(lists: &Vec>>) -> Vec>> { + let mut res = vec![]; + let mut list_iter = lists.iter(); + if let Some(first_list) = list_iter.next() { + for &i in first_list { + res.push(vec![i]); + } + } + for l in list_iter { + let mut tmp = vec![]; + for r in res { + for &el in l { + let mut tmp_el = r.clone(); + tmp_el.push(el); + tmp.push(tmp_el); + } + } + res = tmp; + } + res + } + let mut out: Vec = Vec::new(); + match topic { + VariadicValue::Multiple(multi) => { + let mut foo: Vec>> = Vec::new(); + for v in multi { + foo.push({ + if let Some(v) = v { + match v { + VariadicValue::Single(s) => { + vec![s.clone()] + }, + VariadicValue::Multiple(s) => { + s.clone() + }, + VariadicValue::Null => { + vec![None] + }, + } + } else { + vec![None] + } + }); + } + for permut in cartesian(&foo) { + out.push(FlatTopic::Multiple(permut)); + } + }, + VariadicValue::Single(single) => { + if let Some(single) = single { + out.push(single.clone()); + } + }, + VariadicValue::Null => { + out.push(FlatTopic::Null); + }, + } + out + } + + /// Replace None values - aka wildcards - for the log input value in that position. + pub fn replace(&self, log: &Log, topic: FlatTopic) -> Option> { + let mut out: Vec = Vec::new(); + match topic { + VariadicValue::Single(value) => { + if let Some(value) = value { + out.push(value); + } + }, + VariadicValue::Multiple(value) => { + for (k, v) in value.into_iter().enumerate() { + if let Some(v) = v { + out.push(v); + } else { + out.push(log.topics[k].clone()); + } + } + }, + _ => {} + }; + if out.len() == 0 { + return None; + } + Some(out) + } + + pub fn filter_block_range( + &self, + block_number: u64 + ) -> bool { + let mut out = true; + let filter = self.filter.clone().unwrap(); + if let Some(from) = filter.from_block { + match from { + BlockNumber::Num(_) => { + if from.to_min_block_num().unwrap_or(0 as u64) > block_number { + out = false; + } + }, + _ => {} + } + } + if let Some(to) = filter.to_block { + match to { + BlockNumber::Num(_) => { + if to.to_min_block_num().unwrap_or(0 as u64) < block_number { + out = false; + } + }, + BlockNumber::Earliest => { + out = false; + }, + _ => {} + } + } + out + } + + pub fn filter_block_hash( + &self, + block_hash: H256 + ) -> bool { + if let Some(h) = self.filter.clone().unwrap().block_hash { + if h != block_hash { return false; } + } + true + } + + pub fn filter_address( + &self, + log: &Log + ) -> bool { + if let Some(input_address) = &self.filter.clone().unwrap().address { + match input_address { + VariadicValue::Single(x) => { + if log.address != *x { return false; } + }, + VariadicValue::Multiple(x) => { + if !x.contains(&log.address) { return false; } + }, + _ => { return true; } + } + } + true + } + + pub fn filter_topics( + &self, + log: &Log + ) -> bool { + let mut out: bool = true; + for topic in self.flat_topics.clone() { + match topic { + VariadicValue::Single(single) => { + if let Some(single) = single { + if !log.topics.starts_with(&vec![single]) { + out = false; + } + } + }, + VariadicValue::Multiple(_) => { + let replaced: Option> = self.replace(log, topic); + if let Some(replaced) = replaced { + out = false; + if log.topics.starts_with( + &replaced[..] + ) { + out = true; + } + } + }, + _ => { + out = true; + } + } + } + out + } +} + /// Results of the filter_changes RPC. #[derive(Debug, PartialEq)] pub enum FilterChanges { diff --git a/rpc/core/src/types/mod.rs b/rpc/core/src/types/mod.rs index 26bc3d39f..e0cba1809 100644 --- a/rpc/core/src/types/mod.rs +++ b/rpc/core/src/types/mod.rs @@ -38,7 +38,7 @@ pub use self::bytes::Bytes; pub use self::block::{RichBlock, Block, BlockTransactions, Header, RichHeader, Rich}; pub use self::block_number::BlockNumber; pub use self::call_request::CallRequest; -pub use self::filter::{Filter, FilterChanges, VariadicValue, FilterAddress, Topic}; +pub use self::filter::{Filter, FilterChanges, VariadicValue, FilterAddress, Topic, FilteredParams}; pub use self::index::Index; pub use self::log::Log; pub use self::receipt::Receipt; diff --git a/rpc/src/eth.rs b/rpc/src/eth.rs index cc81dfdd0..1a22e9c0f 100644 --- a/rpc/src/eth.rs +++ b/rpc/src/eth.rs @@ -32,7 +32,7 @@ use sp_blockchain::{Error as BlockChainError, HeaderMetadata, HeaderBackend}; use sc_network::{NetworkService, ExHashT}; use frontier_rpc_core::{EthApi as EthApiT, NetApi as NetApiT}; use frontier_rpc_core::types::{ - BlockNumber, Bytes, CallRequest, Filter, Index, Log, Receipt, RichBlock, + BlockNumber, Bytes, CallRequest, Filter, FilteredParams, Index, Log, Receipt, RichBlock, SyncStatus, SyncInfo, Transaction, Work, Rich, Block, BlockTransactions, VariadicValue }; use frontier_rpc_primitives::{EthereumRuntimeRPCApi, ConvertTransaction, TransactionStatus}; @@ -814,6 +814,7 @@ impl EthApiT for EthApi where fn logs(&self, filter: Filter) -> Result> { let mut blocks_and_statuses = Vec::new(); let mut ret = Vec::new(); + let params = FilteredParams::new(Some(filter.clone())); if let Some(hash) = filter.block_hash { let id = match self.load_hash(hash) @@ -872,42 +873,49 @@ impl EthApiT for EthApi where let logs = status.logs.clone(); let mut transaction_log_index: u32 = 0; let transaction_hash = status.transaction_hash; - for log in logs { + for ethereum_log in logs { + let mut log = Log { + address: ethereum_log.address.clone(), + topics: ethereum_log.topics.clone(), + data: Bytes(ethereum_log.data.clone()), + block_hash: None, + block_number: None, + transaction_hash: None, + transaction_index: None, + log_index: None, + transaction_log_index: None, + removed: false, + }; let mut add: bool = false; if let ( - Some(VariadicValue::Single(address)), - Some(VariadicValue::Multiple(topics)) + Some(VariadicValue::Single(_)), + Some(VariadicValue::Multiple(_)) ) = ( filter.address.clone(), filter.topics.clone(), ) { - if address == log.address && log.topics.starts_with(&topics) { + if !params.filter_address(&log) && params.filter_topics(&log) { add = true; } - } else if let Some(VariadicValue::Single(address)) = filter.address { - if address == log.address { + } else if let Some(VariadicValue::Single(_)) = filter.address { + if !params.filter_address(&log) { add = true; } - } else if let Some(VariadicValue::Multiple(topics)) = &filter.topics { - if log.topics.starts_with(&topics) { + } else if let Some(VariadicValue::Multiple(_)) = &filter.topics { + if params.filter_topics(&log) { add = true; } } else { add = true; } if add { - ret.push(Log { - address: log.address.clone(), - topics: log.topics.clone(), - data: Bytes(log.data.clone()), - block_hash: Some(block_hash), - block_number: Some(block.header.number.clone()), - transaction_hash: Some(transaction_hash), - transaction_index: Some(U256::from(status.transaction_index)), - log_index: Some(U256::from(block_log_index)), - transaction_log_index: Some(U256::from(transaction_log_index)), - removed: false, - }); + log.block_hash = Some(block_hash); + log.block_number = Some(block.header.number.clone()); + log.transaction_hash = Some(transaction_hash); + log.transaction_index = Some(U256::from(status.transaction_index)); + log.log_index = Some(U256::from(block_log_index)); + log.transaction_log_index = Some(U256::from(transaction_log_index)); + ret.push(log); } transaction_log_index += 1; block_log_index += 1; diff --git a/rpc/src/eth_pubsub.rs b/rpc/src/eth_pubsub.rs index 42dab5141..31143e9a5 100644 --- a/rpc/src/eth_pubsub.rs +++ b/rpc/src/eth_pubsub.rs @@ -19,7 +19,7 @@ use log::warn; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager}; use frontier_rpc_core::EthPubSubApi::{self as EthPubSubApiT}; use frontier_rpc_core::types::{ - BlockNumber, Rich, Header, Bytes, Log, FilterAddress, Topic, VariadicValue, + Rich, Header, Bytes, Log, FilteredParams, pubsub::{Kind, Params, Result as PubSubResult, PubSubSyncStatus} }; use ethereum_types::{H256, U256}; @@ -33,7 +33,6 @@ use jsonrpc_core::{Result as JsonRpcResult, futures::{Future, Sink}}; use frontier_rpc_primitives::{EthereumRuntimeRPCApi, TransactionStatus}; use sc_network::{NetworkService, ExHashT}; -use crate::internal_err; pub struct EthPubSubApi { _pool: Arc

, @@ -54,202 +53,6 @@ impl EthPubSubApi { } } -impl EthPubSubApi where - B: BlockT + Send + Sync + 'static, - P: TransactionPool + Send + Sync + 'static, - C: ProvideRuntimeApi + StorageProvider + - BlockchainEvents + AuxStore, - C: HeaderBackend + HeaderMetadata + 'static, - C::Api: EthereumRuntimeRPCApi, - C: Send + Sync + 'static, - BE: Backend + 'static, - BE::State: StateBackend, -{ - fn native_block_number(&self, number: Option) -> JsonRpcResult> { - let mut native_number: Option = None; - - if let Some(number) = number { - match number { - BlockNumber::Hash { hash, .. } => { - let id = self.load_hash(hash).unwrap_or(None); - if let Some(id) = id { - if let Ok(Some(block)) = self.client.runtime_api().current_block(&id) { - native_number = Some(block.header.number.as_u32()); - } - } - }, - BlockNumber::Num(_) => { - if let Some(number) = number.to_min_block_num() { - native_number = Some(number.unique_saturated_into()); - } - }, - BlockNumber::Latest => { - native_number = Some( - self.client.info().best_number.clone().unique_saturated_into() as u32 - ); - }, - BlockNumber::Earliest => { - native_number = Some(0); - }, - BlockNumber::Pending => { - native_number = None; - } - }; - } else { - native_number = Some( - self.client.info().best_number.clone().unique_saturated_into() as u32 - ); - } - Ok(native_number) - } - - fn filter_block( - &self, - params: Option - ) -> (Option, Option) { - if let Some(Params::Logs(f)) = params { - let to_block: Option = if f.to_block.is_some() { - self.native_block_number(f.to_block).unwrap_or(None) - } else { - None - }; - return ( - self.native_block_number(f.from_block).unwrap_or(None), - to_block - ); - } - (None, None) - } - - // Asumes there is only one mapped canonical block in the AuxStore, otherwise something is wrong - fn load_hash(&self, hash: H256) -> JsonRpcResult>> { - let hashes = match frontier_consensus::load_block_hash::(self.client.as_ref(), hash) - .map_err(|err| internal_err(format!("fetch aux store failed: {:?}", err)))? - { - Some(hashes) => hashes, - None => return Ok(None), - }; - let out: Vec = hashes.into_iter() - .filter_map(|h| { - if let Ok(Some(_)) = self.client.header(BlockId::Hash(h)) { - Some(h) - } else { - None - } - }).collect(); - - if out.len() == 1 { - return Ok(Some( - BlockId::Hash(out[0]) - )); - } - Ok(None) - } -} - -struct FilteredParams { - from_block: Option, - to_block: Option, - block_hash: Option, - address: Option, - topics: Option -} - -impl FilteredParams { - pub fn new( - params: Option, - block_range: (Option, Option) - ) -> Self { - if let Some(Params::Logs(d)) = params { - return FilteredParams { - from_block: block_range.0, - to_block: block_range.1, - block_hash: d.block_hash, - address: d.address, - topics: d.topics - }; - } - FilteredParams { - from_block: None, - to_block: None, - block_hash: None, - address: None, - topics: None - } - } - - pub fn filter_block_range( - &self, - block: ðereum::Block - ) -> bool { - let mut out = true; - let number: u32 = UniqueSaturatedInto::::unique_saturated_into( - block.header.number - ); - if let Some(from) = self.from_block { - if from > number { - out = false; - } - } - if let Some(to) = self.to_block { - if to < number { - out = false; - } - } - out - } - - fn filter_block_hash( - &self, - block_hash: H256 - ) -> bool { - if let Some(h) = self.block_hash { - if h != block_hash { return false; } - } - true - } - - fn filter_address( - &self, - log: ðereum::Log - ) -> bool { - if let Some(input_address) = &self.address { - match input_address { - VariadicValue::Single(x) => { - if log.address != *x { return false; } - }, - VariadicValue::Multiple(x) => { - if !x.contains(&log.address) { return false; } - }, - _ => { return true; } - } - } - true - } - - fn filter_topics( - &self, - log: ðereum::Log - ) -> bool { - if let Some(input_topics) = &self.topics { - match input_topics { - VariadicValue::Single(x) => { - if !log.topics.starts_with(&vec![*x]) { - return false; - } - }, - VariadicValue::Multiple(x) => { - if !log.topics.starts_with(&x) { - return false; - } - }, - _ => { return true; } - } - } - true - } -} - struct SubscriptionResult {} impl SubscriptionResult { pub fn new() -> Self { SubscriptionResult{} } @@ -343,14 +146,31 @@ impl SubscriptionResult { fn add_log( &self, block_hash: H256, - log: ðereum::Log, + ethereum_log: ðereum::Log, block: ðereum::Block, params: &FilteredParams ) -> bool { - if !params.filter_block_range(block) || - !params.filter_block_hash(block_hash) || - !params.filter_address(log) || !params.filter_topics(log) { - return false; + let log = Log { + address: ethereum_log.address.clone(), + topics: ethereum_log.topics.clone(), + data: Bytes(ethereum_log.data.clone()), + block_hash: None, + block_number: None, + transaction_hash: None, + transaction_index: None, + log_index: None, + transaction_log_index: None, + removed: false, + }; + if let Some(_) = params.filter { + let block_number = UniqueSaturatedInto::::unique_saturated_into( + block.header.number + ); + if !params.filter_block_range(block_number) || + !params.filter_block_hash(block_hash) || + !params.filter_address(&log) || !params.filter_topics(&log) { + return false; + } } true } @@ -395,8 +215,11 @@ impl EthPubSubApiT for EthPubSubApi, ) { - let filter_block = self.filter_block(params.clone()); - let filtered_params = FilteredParams::new(params, filter_block); + let filtered_params = match params { + Some(Params::Logs(filter)) => FilteredParams::new(Some(filter)), + _ => FilteredParams::default() + }; + let client = self.client.clone(); let network = self.network.clone(); match kind {