Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bloom): use aggregate bloom filters in P2P and starknet_subscribeEvents #2405

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ jobs:
rm -rf ~/.cargo/git
- run: |
cargo clippy --workspace --all-targets --all-features --locked -- -D warnings -D rust_2018_idioms
cargo clippy --workspace --all-targets --locked -- -D warnings -D rust_2018_idioms
cargo clippy --workspace --all-targets --all-features --locked --manifest-path crates/load-test/Cargo.toml -- -D warnings -D rust_2018_idioms
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "src/lib.rs"
[features]
tokio-console = ["console-subscriber", "tokio/tracing"]
p2p = []
aggregate_bloom = []
aggregate_bloom = ["pathfinder-storage/aggregate_bloom", "pathfinder-rpc/aggregate_bloom"]

[dependencies]
anyhow = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ async fn l2_reorg(

#[cfg(feature = "aggregate_bloom")]
transaction
.reconstruct_running_aggregate()
.reconstruct_running_event_filter()
.context("Reconstructing running aggregate bloom")?;

// Track combined L1 and L2 state.
Expand Down
20 changes: 19 additions & 1 deletion crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,24 @@ impl Sync {
return Ok(());
};

// TODO:
// Replace `start` with the code below once individual aggregate filters
// are removed.
#[cfg(feature = "aggregate_bloom")]
{
if let Some(start_aggregate) =
events::next_missing_aggregate(self.storage.clone(), stop)?
{
if start_aggregate != start {
tracing::error!(
"Running event filter block mismatch. Expected: {}, got: {}",
start,
start_aggregate
);
}
}
}

let event_stream = self.p2p.clone().event_stream(
start,
stop,
Expand Down Expand Up @@ -667,7 +685,7 @@ async fn rollback_to_anchor(

#[cfg(feature = "aggregate_bloom")]
transaction
.reconstruct_running_aggregate()
.reconstruct_running_event_filter()
.context("Reconstructing running aggregate bloom")?;

Ok(())
Expand Down
13 changes: 13 additions & 0 deletions crates/pathfinder/src/sync/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@ pub(super) async fn next_missing(
.context("Joining blocking task")?
}

#[cfg(feature = "aggregate_bloom")]
pub(super) fn next_missing_aggregate(
storage: Storage,
head: BlockNumber,
) -> anyhow::Result<Option<BlockNumber>> {
let next = storage
.connection()?
.transaction()?
.next_block_without_events();

Ok((next < head).then_some(next))
}

pub(super) fn get_counts(
db: pathfinder_storage::Transaction<'_>,
start: BlockNumber,
Expand Down
40 changes: 33 additions & 7 deletions crates/rpc/src/method/subscribe_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,39 @@ impl RpcSubscriptionFlow for SubscribeEvents {
let (events, last_block) = tokio::task::spawn_blocking(move || -> Result<_, RpcError> {
let mut conn = storage.connection().map_err(RpcError::InternalError)?;
let db = conn.transaction().map_err(RpcError::InternalError)?;
db.events_in_range(
from,
to,
params.from_address,
params.keys.unwrap_or_default(),
)
.map_err(RpcError::InternalError)
let events = db
.events_in_range(
from,
to,
params.from_address,
#[cfg(feature = "aggregate_bloom")]
params.keys.clone().unwrap_or_default(),
#[cfg(not(feature = "aggregate_bloom"))]
params.keys.unwrap_or_default(),
)
.map_err(RpcError::InternalError)?;

#[cfg(feature = "aggregate_bloom")]
{
let events_from_aggregate = db
.events_in_range_aggregate(
from,
to,
params.from_address,
params.keys.unwrap_or_default(),
)
.unwrap();

assert_eq!(events.0.len(), events_from_aggregate.0.len());
for (event, event_from_aggregate) in
events.0.iter().zip(events_from_aggregate.0.iter())
{
assert_eq!(event, event_from_aggregate);
}
assert_eq!(events.1, events_from_aggregate.1);
}

Ok(events)
})
.await
.map_err(|e| RpcError::InternalError(e.into()))??;
Expand Down
19 changes: 9 additions & 10 deletions crates/storage/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use std::sync::Mutex;
mod block;
mod class;
mod ethereum;
pub(crate) mod event;
pub mod event;
mod reference;
mod reorg_counter;
mod signature;
mod state_update;
pub(crate) mod transaction;
mod trie;

#[cfg(feature = "aggregate_bloom")]
use event::RunningEventFilter;
pub use event::{
EmittedEvent,
EventFilter,
Expand All @@ -29,31 +31,28 @@ pub(crate) use reorg_counter::ReorgCounter;
pub use rusqlite::TransactionBehavior;
pub use trie::{Node, NodeRef, RootIndexUpdate, StoredNode, TrieUpdate};

#[cfg(feature = "aggregate_bloom")]
use crate::bloom::AggregateBloom;

type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;

pub struct Connection {
connection: PooledConnection,
bloom_filter_cache: Arc<crate::bloom::Cache>,
#[cfg(feature = "aggregate_bloom")]
running_aggregate: Arc<Mutex<AggregateBloom>>,
running_event_filter: Arc<Mutex<RunningEventFilter>>,
trie_prune_mode: TriePruneMode,
}

impl Connection {
pub(crate) fn new(
connection: PooledConnection,
bloom_filter_cache: Arc<crate::bloom::Cache>,
#[cfg(feature = "aggregate_bloom")] running_aggregate: Arc<Mutex<AggregateBloom>>,
#[cfg(feature = "aggregate_bloom")] running_event_filter: Arc<Mutex<RunningEventFilter>>,
trie_prune_mode: TriePruneMode,
) -> Self {
Self {
connection,
bloom_filter_cache,
#[cfg(feature = "aggregate_bloom")]
running_aggregate,
running_event_filter,
trie_prune_mode,
}
}
Expand All @@ -64,7 +63,7 @@ impl Connection {
transaction: tx,
bloom_filter_cache: self.bloom_filter_cache.clone(),
#[cfg(feature = "aggregate_bloom")]
running_aggregate: self.running_aggregate.clone(),
running_event_filter: self.running_event_filter.clone(),
trie_prune_mode: self.trie_prune_mode,
})
}
Expand All @@ -78,7 +77,7 @@ impl Connection {
transaction: tx,
bloom_filter_cache: self.bloom_filter_cache.clone(),
#[cfg(feature = "aggregate_bloom")]
running_aggregate: self.running_aggregate.clone(),
running_event_filter: self.running_event_filter.clone(),
trie_prune_mode: self.trie_prune_mode,
})
}
Expand All @@ -88,7 +87,7 @@ pub struct Transaction<'inner> {
transaction: rusqlite::Transaction<'inner>,
bloom_filter_cache: Arc<crate::bloom::Cache>,
#[cfg(feature = "aggregate_bloom")]
running_aggregate: Arc<Mutex<AggregateBloom>>,
running_event_filter: Arc<Mutex<RunningEventFilter>>,
trie_prune_mode: TriePruneMode,
}

Expand Down
Loading