From f98fdeee360d49113a93db594644f2a6cfd6529d Mon Sep 17 00:00:00 2001 From: sistemd Date: Mon, 25 Nov 2024 13:30:46 +0100 Subject: [PATCH 1/2] use aggregate bloom filters in P2P and `SubsribeEvents` --- crates/pathfinder/Cargo.toml | 4 +- crates/pathfinder/src/state/sync.rs | 2 +- crates/pathfinder/src/sync/checkpoint.rs | 20 ++- crates/pathfinder/src/sync/events.rs | 13 ++ crates/rpc/src/method/subscribe_events.rs | 40 ++++- crates/storage/src/connection.rs | 19 +- crates/storage/src/connection/event.rs | 206 ++++++++++++++++------ crates/storage/src/lib.rs | 21 +-- 8 files changed, 238 insertions(+), 87 deletions(-) diff --git a/crates/pathfinder/Cargo.toml b/crates/pathfinder/Cargo.toml index 86ee8836a8..66ee386889 100644 --- a/crates/pathfinder/Cargo.toml +++ b/crates/pathfinder/Cargo.toml @@ -44,9 +44,9 @@ pathfinder-ethereum = { path = "../ethereum" } pathfinder-executor = { path = "../executor" } pathfinder-merkle-tree = { path = "../merkle-tree" } pathfinder-retry = { path = "../retry" } -pathfinder-rpc = { path = "../rpc" } +pathfinder-rpc = { path = "../rpc", features = ["aggregate_bloom"] } pathfinder-serde = { path = "../serde" } -pathfinder-storage = { path = "../storage" } +pathfinder-storage = { path = "../storage", features = ["aggregate_bloom"] } primitive-types = { workspace = true } rand = { workspace = true } rayon = { workspace = true } diff --git a/crates/pathfinder/src/state/sync.rs b/crates/pathfinder/src/state/sync.rs index 08bc24ecbc..9715085e4f 100644 --- a/crates/pathfinder/src/state/sync.rs +++ b/crates/pathfinder/src/state/sync.rs @@ -1084,7 +1084,7 @@ async fn l2_reorg( #[cfg(feature = "aggregate_bloom")] transaction - .reconstruct_running_aggregate() + .reconstruct_running_event_filter() .context("Reconstructing running aggregate bloom")?; // Track combined L1 and L2 state. diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index d95f0bd942..bfedb89785 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -281,6 +281,24 @@ impl Sync { return Ok(()); }; + // TODO: + // Replace `start` with the code below once individual aggregate filters + // are removed. + #[cfg(feature = "aggregate_bloom")] + { + if let Some(start_aggregate) = + events::next_missing_aggregate(self.storage.clone(), stop)? + { + if start_aggregate != start { + tracing::error!( + "Running event filter block mismatch. Expected: {}, got: {}", + start, + start_aggregate + ); + } + } + } + let event_stream = self.p2p.clone().event_stream( start, stop, @@ -667,7 +685,7 @@ async fn rollback_to_anchor( #[cfg(feature = "aggregate_bloom")] transaction - .reconstruct_running_aggregate() + .reconstruct_running_event_filter() .context("Reconstructing running aggregate bloom")?; Ok(()) diff --git a/crates/pathfinder/src/sync/events.rs b/crates/pathfinder/src/sync/events.rs index 6e8c52a04a..6726f77f18 100644 --- a/crates/pathfinder/src/sync/events.rs +++ b/crates/pathfinder/src/sync/events.rs @@ -50,6 +50,19 @@ pub(super) async fn next_missing( .context("Joining blocking task")? } +#[cfg(feature = "aggregate_bloom")] +pub(super) fn next_missing_aggregate( + storage: Storage, + head: BlockNumber, +) -> anyhow::Result> { + let next = storage + .connection()? + .transaction()? + .next_block_without_events(); + + Ok((next < head).then_some(next)) +} + pub(super) fn get_counts( db: pathfinder_storage::Transaction<'_>, start: BlockNumber, diff --git a/crates/rpc/src/method/subscribe_events.rs b/crates/rpc/src/method/subscribe_events.rs index bfb5796bd4..f1b9e9a8a7 100644 --- a/crates/rpc/src/method/subscribe_events.rs +++ b/crates/rpc/src/method/subscribe_events.rs @@ -103,13 +103,39 @@ impl RpcSubscriptionFlow for SubscribeEvents { let (events, last_block) = tokio::task::spawn_blocking(move || -> Result<_, RpcError> { let mut conn = storage.connection().map_err(RpcError::InternalError)?; let db = conn.transaction().map_err(RpcError::InternalError)?; - db.events_in_range( - from, - to, - params.from_address, - params.keys.unwrap_or_default(), - ) - .map_err(RpcError::InternalError) + let events = db + .events_in_range( + from, + to, + params.from_address, + #[cfg(feature = "aggregate_bloom")] + params.keys.clone().unwrap_or_default(), + #[cfg(not(feature = "aggregate_bloom"))] + params.keys.unwrap_or_default(), + ) + .map_err(RpcError::InternalError)?; + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = db + .events_in_range_aggregate( + from, + to, + params.from_address, + params.keys.unwrap_or_default(), + ) + .unwrap(); + + assert_eq!(events.0.len(), events_from_aggregate.0.len()); + for (event, event_from_aggregate) in + events.0.iter().zip(events_from_aggregate.0.iter()) + { + assert_eq!(event, event_from_aggregate); + } + assert_eq!(events.1, events_from_aggregate.1); + } + + Ok(events) }) .await .map_err(|e| RpcError::InternalError(e.into()))??; diff --git a/crates/storage/src/connection.rs b/crates/storage/src/connection.rs index 46ead118f0..5eb835a929 100644 --- a/crates/storage/src/connection.rs +++ b/crates/storage/src/connection.rs @@ -5,7 +5,7 @@ use std::sync::Mutex; mod block; mod class; mod ethereum; -pub(crate) mod event; +pub mod event; mod reference; mod reorg_counter; mod signature; @@ -13,6 +13,8 @@ mod state_update; pub(crate) mod transaction; mod trie; +#[cfg(feature = "aggregate_bloom")] +use event::RunningEventFilter; pub use event::{ EmittedEvent, EventFilter, @@ -29,16 +31,13 @@ pub(crate) use reorg_counter::ReorgCounter; pub use rusqlite::TransactionBehavior; pub use trie::{Node, NodeRef, RootIndexUpdate, StoredNode, TrieUpdate}; -#[cfg(feature = "aggregate_bloom")] -use crate::bloom::AggregateBloom; - type PooledConnection = r2d2::PooledConnection; pub struct Connection { connection: PooledConnection, bloom_filter_cache: Arc, #[cfg(feature = "aggregate_bloom")] - running_aggregate: Arc>, + running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } @@ -46,14 +45,14 @@ impl Connection { pub(crate) fn new( connection: PooledConnection, bloom_filter_cache: Arc, - #[cfg(feature = "aggregate_bloom")] running_aggregate: Arc>, + #[cfg(feature = "aggregate_bloom")] running_event_filter: Arc>, trie_prune_mode: TriePruneMode, ) -> Self { Self { connection, bloom_filter_cache, #[cfg(feature = "aggregate_bloom")] - running_aggregate, + running_event_filter, trie_prune_mode, } } @@ -64,7 +63,7 @@ impl Connection { transaction: tx, bloom_filter_cache: self.bloom_filter_cache.clone(), #[cfg(feature = "aggregate_bloom")] - running_aggregate: self.running_aggregate.clone(), + running_event_filter: self.running_event_filter.clone(), trie_prune_mode: self.trie_prune_mode, }) } @@ -78,7 +77,7 @@ impl Connection { transaction: tx, bloom_filter_cache: self.bloom_filter_cache.clone(), #[cfg(feature = "aggregate_bloom")] - running_aggregate: self.running_aggregate.clone(), + running_event_filter: self.running_event_filter.clone(), trie_prune_mode: self.trie_prune_mode, }) } @@ -88,7 +87,7 @@ pub struct Transaction<'inner> { transaction: rusqlite::Transaction<'inner>, bloom_filter_cache: Arc, #[cfg(feature = "aggregate_bloom")] - running_aggregate: Arc>, + running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index aa80f6ef02..304ed2248c 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -69,17 +69,10 @@ pub struct PageOfEvents { impl Transaction<'_> { #[cfg(feature = "aggregate_bloom")] - pub fn reconstruct_running_aggregate(&self) -> anyhow::Result<()> { - let aggregate = reconstruct_running_aggregate(self.inner())?; - let mut running_aggregate = match self.running_aggregate.lock() { - Ok(guard) => guard, - Err(poisoned) => { - tracing::error!("Poisoned lock in reconstruct_running_aggregate"); - poisoned.into_inner() - } - }; - - *running_aggregate = aggregate; + pub fn reconstruct_running_event_filter(&self) -> anyhow::Result<()> { + let event_filter = reconstruct_running_event_filter(self.inner())?; + let mut running_event_filter = self.running_event_filter.lock().unwrap(); + *running_event_filter = event_filter; Ok(()) } @@ -105,7 +98,7 @@ impl Transaction<'_> { ", )?; - let mut running_aggregate = self.running_aggregate.lock().unwrap(); + let mut running_event_filter = self.running_event_filter.lock().unwrap(); let mut bloom = BloomFilter::new(); for event in events { @@ -113,17 +106,22 @@ impl Transaction<'_> { bloom.set_address(&event.from_address); } - running_aggregate.add_bloom(&bloom, block_number); + running_event_filter.filter.add_bloom(&bloom, block_number); + running_event_filter.next_block = block_number + 1; + // This check is the reason that blocks cannot be skipped, if they were we would // risk missing the last block of the current aggregate's range. - if block_number == running_aggregate.to_block { + if block_number == running_event_filter.filter.to_block { stmt.execute(params![ - &running_aggregate.from_block, - &running_aggregate.to_block, - &running_aggregate.compress_bitmap() + &running_event_filter.filter.from_block, + &running_event_filter.filter.to_block, + &running_event_filter.filter.compress_bitmap() ])?; - *running_aggregate = AggregateBloom::new(running_aggregate.to_block + 1); + *running_event_filter = RunningEventFilter { + filter: AggregateBloom::new(block_number + 1), + next_block: block_number + 1, + }; } Ok(()) @@ -220,6 +218,101 @@ impl Transaction<'_> { } } + #[cfg(feature = "aggregate_bloom")] + pub fn events_in_range_aggregate( + &self, + from_block: BlockNumber, + to_block: BlockNumber, + contract_address: Option, + keys: Vec>, + ) -> anyhow::Result<(Vec, Option)> { + let Some(latest_block) = self.block_number(crate::BlockId::Latest)? else { + // No blocks in the database + return Ok((vec![], None)); + }; + if from_block > latest_block { + return Ok((vec![], None)); + } + + let filter = EventFilter { + contract_address, + keys, + page_size: usize::MAX - 1, + ..Default::default() + }; + + let aggregates = self.load_aggregate_bloom_range(from_block, to_block)?; + + let blocks_to_scan = aggregates + .iter() + .flat_map(|aggregate| aggregate.blocks_for_filter(&filter)) + .filter(|&block| (from_block..=to_block).contains(&block)); + + let key_filter_is_empty = filter.keys.iter().flatten().count() == 0; + let keys: Vec> = filter + .keys + .iter() + .map(|keys| keys.iter().collect()) + .collect(); + + let mut emitted_events = vec![]; + + for block in blocks_to_scan { + let Some(block_header) = self.block_header(crate::BlockId::Number(block))? else { + break; + }; + + let events = match self.events_for_block(block.into())? { + Some(events) => events, + // Reached the end of P2P (checkpoint) synced events. + None => break, + }; + + let events = events + .into_iter() + .flat_map(|(transaction_hash, events)| { + events.into_iter().zip(std::iter::repeat(transaction_hash)) + }) + .filter(|(event, _)| match filter.contract_address { + Some(address) => event.from_address == address, + None => true, + }) + .filter(|(event, _)| { + if key_filter_is_empty { + return true; + } + + if event.keys.len() < keys.len() { + return false; + } + + event + .keys + .iter() + .zip(keys.iter()) + .all(|(key, filter)| filter.is_empty() || filter.contains(key)) + }) + .map(|(event, tx_hash)| EmittedEvent { + data: event.data.clone(), + keys: event.keys.clone(), + from_address: event.from_address, + block_hash: block_header.hash, + block_number: block_header.number, + transaction_hash: tx_hash, + }); + + emitted_events.extend(events); + } + + let last_scanned_block = if latest_block > to_block { + to_block + } else { + latest_block + }; + + Ok((emitted_events, Some(last_scanned_block))) + } + #[tracing::instrument(skip(self))] pub fn events( &self, @@ -412,7 +505,7 @@ impl Transaction<'_> { let events = match self.events_for_block(block.into())? { Some(events) => events, - // Reached the end of P2P synced events. + // Reached the end of P2P (checkpoint) synced events. None => { return Ok(PageOfEvents { events: emitted_events, @@ -611,10 +704,7 @@ impl Transaction<'_> { }) } - // TODO: - // Use this for `SubscribeEvents` #[cfg(feature = "aggregate_bloom")] - #[allow(dead_code)] fn load_aggregate_bloom_range( &self, start_block: BlockNumber, @@ -655,14 +745,8 @@ impl Transaction<'_> { let should_include_running = aggregates.last().map_or(true, |a| end_block > a.to_block); if should_include_running { - let running_aggregate = match self.running_aggregate.lock() { - Ok(guard) => guard, - Err(poisoned) => { - tracing::error!("Poisoned lock in load_aggregate_bloom_range"); - poisoned.into_inner() - } - }; - aggregates.push(running_aggregate.clone()); + let running_event_filter = self.running_event_filter.lock().unwrap(); + aggregates.push(running_event_filter.filter.clone()); } Ok(aggregates) @@ -728,18 +812,24 @@ impl Transaction<'_> { let load_limit_reached = total_aggregate_filters > max_bloom_filters_to_load.get() as u64; if should_include_running && !load_limit_reached { - let running_aggregate = match self.running_aggregate.lock() { - Ok(guard) => guard, - Err(poisoned) => { - tracing::error!("Poisoned lock in load_aggregate_bloom_range"); - poisoned.into_inner() - } - }; - aggregates.push(running_aggregate.clone()); + let running_event_filter = self.running_event_filter.lock().unwrap(); + aggregates.push(running_event_filter.filter.clone()); } Ok((aggregates, load_limit_reached)) } + + #[cfg(feature = "aggregate_bloom")] + pub fn next_block_without_events(&self) -> BlockNumber { + self.running_event_filter.lock().unwrap().next_block + } +} + +#[cfg(feature = "aggregate_bloom")] +#[derive(Debug)] +pub(crate) struct RunningEventFilter { + filter: AggregateBloom, + next_block: BlockNumber, } /// Reconstruct the [aggregate](crate::bloom::AggregateBloom) for the range of @@ -750,9 +840,9 @@ impl Transaction<'_> { /// range is complete, before that it is kept in memory and can be lost upon /// shutdown. #[cfg(feature = "aggregate_bloom")] -pub fn reconstruct_running_aggregate( +pub(crate) fn reconstruct_running_event_filter( tx: &rusqlite::Transaction<'_>, -) -> anyhow::Result { +) -> anyhow::Result { use super::transaction; let mut last_to_block_stmt = tx.prepare( @@ -766,7 +856,7 @@ pub fn reconstruct_running_aggregate( r" SELECT events FROM transactions - WHERE block_number >= :first_running_aggregate_block + WHERE block_number >= :first_runnining_event_filter_block ", )?; @@ -775,7 +865,7 @@ pub fn reconstruct_running_aggregate( .optional() .context("Querying last stored aggregate to_block")?; - let first_running_aggregate_block = match last_to_block { + let first_runnining_event_filter_block = match last_to_block { Some(last_to_block) => BlockNumber::new_or_panic(last_to_block + 1), // Aggregate Bloom filter table is empty -> reconstruct running aggregate // from the genesis block. @@ -784,7 +874,7 @@ pub fn reconstruct_running_aggregate( let events_to_reconstruct: Vec>>> = events_to_reconstruct_stmt .query_and_then( - named_params![":first_running_aggregate_block": &first_running_aggregate_block], + named_params![":first_runnining_event_filter_block": &first_runnining_event_filter_block], |row| { let events: Option = row .get_optional_blob(0)? @@ -812,23 +902,27 @@ pub fn reconstruct_running_aggregate( .context("Querying events to reconstruct")? .collect::>()?; - let mut running_aggregate = AggregateBloom::new(first_running_aggregate_block); + let mut aggregate = AggregateBloom::new(first_runnining_event_filter_block); for (block, events_for_block) in events_to_reconstruct.iter().enumerate() { - if let Some(events) = events_for_block { - let block_number = first_running_aggregate_block + block as u64; - - let mut bloom = BloomFilter::new(); - for event in events.iter().flatten() { - bloom.set_keys(&event.keys); - bloom.set_address(&event.from_address); - } + let Some(events) = events_for_block else { + break; + }; - running_aggregate.add_bloom(&bloom, block_number); + let mut bloom = BloomFilter::new(); + for event in events.iter().flatten() { + bloom.set_keys(&event.keys); + bloom.set_address(&event.from_address); } + + let block_number = first_runnining_event_filter_block + block as u64; + aggregate.add_bloom(&bloom, block_number); } - Ok(running_aggregate) + Ok(RunningEventFilter { + filter: aggregate, + next_block: first_runnining_event_filter_block + events_to_reconstruct.len() as u64, + }) } fn continuation_token( @@ -1862,10 +1956,10 @@ mod tests { .unwrap(); assert_eq!(inserted_aggregate_filter_count, 2); - let running_aggregate = tx.running_aggregate.lock().unwrap(); + let running_event_filter = tx.running_event_filter.lock().unwrap(); // Running aggregate starts from next block range. assert_eq!( - running_aggregate.from_block, + running_event_filter.filter.from_block, 2 * AggregateBloom::BLOCK_RANGE_LEN ); } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index ec2209d50d..1a5242d121 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -20,10 +20,10 @@ use std::sync::Arc; use std::sync::Mutex; use anyhow::Context; -#[cfg(feature = "aggregate_bloom")] -use bloom::AggregateBloom; pub use bloom::EVENT_KEY_FILTER_LIMIT; pub use connection::*; +#[cfg(feature = "aggregate_bloom")] +use event::RunningEventFilter; use pathfinder_common::{BlockHash, BlockNumber}; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; @@ -96,7 +96,7 @@ struct Inner { pool: Pool, bloom_filter_cache: Arc, #[cfg(feature = "aggregate_bloom")] - running_aggregate: Arc>, + running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } @@ -105,7 +105,7 @@ pub struct StorageManager { journal_mode: JournalMode, bloom_filter_cache: Arc, #[cfg(feature = "aggregate_bloom")] - running_aggregate: Arc>, + running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } @@ -138,7 +138,7 @@ impl StorageManager { pool, bloom_filter_cache: self.bloom_filter_cache.clone(), #[cfg(feature = "aggregate_bloom")] - running_aggregate: self.running_aggregate.clone(), + running_event_filter: self.running_event_filter.clone(), trie_prune_mode: self.trie_prune_mode, })) } @@ -290,8 +290,9 @@ impl StorageBuilder { } #[cfg(feature = "aggregate_bloom")] - let running_aggregate = event::reconstruct_running_aggregate(&connection.transaction()?) - .context("Reconstructing running aggregate bloom filter")?; + let running_event_filter = + event::reconstruct_running_event_filter(&connection.transaction()?) + .context("Reconstructing running aggregate bloom filter")?; connection .close() @@ -303,7 +304,7 @@ impl StorageBuilder { journal_mode: self.journal_mode, bloom_filter_cache: Arc::new(bloom::Cache::with_size(self.bloom_filter_cache_size)), #[cfg(feature = "aggregate_bloom")] - running_aggregate: Arc::new(Mutex::new(running_aggregate)), + running_event_filter: Arc::new(Mutex::new(running_event_filter)), trie_prune_mode, }) } @@ -376,7 +377,7 @@ impl Storage { conn, self.0.bloom_filter_cache.clone(), #[cfg(feature = "aggregate_bloom")] - self.0.running_aggregate.clone(), + self.0.running_event_filter.clone(), self.0.trie_prune_mode, )) } @@ -651,7 +652,7 @@ mod tests { #[test] #[cfg(feature = "aggregate_bloom")] - fn running_aggregate_reconstructed_after_shutdown() { + fn running_event_filter_reconstructed_after_shutdown() { use std::num::NonZeroUsize; use std::sync::LazyLock; From f698bb89ada3139ab6056e716a8b6666ec562e7c Mon Sep 17 00:00:00 2001 From: sistemd Date: Wed, 27 Nov 2024 16:30:11 +0100 Subject: [PATCH 2/2] fix problem with features --- .github/workflows/ci.yml | 1 + crates/pathfinder/Cargo.toml | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index abef173062..1929cfa89f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,6 +73,7 @@ jobs: rm -rf ~/.cargo/git - run: | cargo clippy --workspace --all-targets --all-features --locked -- -D warnings -D rust_2018_idioms + cargo clippy --workspace --all-targets --locked -- -D warnings -D rust_2018_idioms cargo clippy --workspace --all-targets --all-features --locked --manifest-path crates/load-test/Cargo.toml -- -D warnings -D rust_2018_idioms diff --git a/crates/pathfinder/Cargo.toml b/crates/pathfinder/Cargo.toml index 66ee386889..b284079ea9 100644 --- a/crates/pathfinder/Cargo.toml +++ b/crates/pathfinder/Cargo.toml @@ -14,7 +14,7 @@ path = "src/lib.rs" [features] tokio-console = ["console-subscriber", "tokio/tracing"] p2p = [] -aggregate_bloom = [] +aggregate_bloom = ["pathfinder-storage/aggregate_bloom", "pathfinder-rpc/aggregate_bloom"] [dependencies] anyhow = { workspace = true } @@ -44,9 +44,9 @@ pathfinder-ethereum = { path = "../ethereum" } pathfinder-executor = { path = "../executor" } pathfinder-merkle-tree = { path = "../merkle-tree" } pathfinder-retry = { path = "../retry" } -pathfinder-rpc = { path = "../rpc", features = ["aggregate_bloom"] } +pathfinder-rpc = { path = "../rpc" } pathfinder-serde = { path = "../serde" } -pathfinder-storage = { path = "../storage", features = ["aggregate_bloom"] } +pathfinder-storage = { path = "../storage" } primitive-types = { workspace = true } rand = { workspace = true } rayon = { workspace = true }