-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
backend(fix): Remove only finalized blocks from the event window #1356
Changes from 3 commits
d3d842a
eca074a
30b66b6
ef0a32c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEve | |
use crate::config::BlockHash; | ||
use crate::error::Error; | ||
use futures::stream::{Stream, StreamExt}; | ||
use std::collections::{HashMap, VecDeque}; | ||
use std::collections::{HashMap, HashSet, VecDeque}; | ||
use std::ops::DerefMut; | ||
use std::pin::Pin; | ||
use std::sync::{Arc, Mutex}; | ||
|
@@ -165,9 +165,8 @@ struct SharedState<Hash: BlockHash> { | |
done: bool, | ||
next_id: usize, | ||
subscribers: HashMap<usize, SubscriberDetails<Hash>>, | ||
// Keep a buffer of all events from last finalized block so that new | ||
// subscriptions can be handed this info first. | ||
block_events_from_last_finalized: VecDeque<FollowEvent<BlockRef<Hash>>>, | ||
/// Keep a buffer of all events that should be handed to a new subscription. | ||
block_events_for_new_subscriptions: VecDeque<FollowEvent<BlockRef<Hash>>>, | ||
// Keep track of the subscription ID we send out on new subs. | ||
current_subscription_id: Option<String>, | ||
// Keep track of the init message we send out on new subs. | ||
|
@@ -186,7 +185,7 @@ impl<Hash: BlockHash> Default for Shared<Hash> { | |
current_init_message: None, | ||
current_subscription_id: None, | ||
seen_runtime_events: HashMap::new(), | ||
block_events_from_last_finalized: VecDeque::new(), | ||
block_events_for_new_subscriptions: VecDeque::new(), | ||
}))) | ||
} | ||
} | ||
|
@@ -251,7 +250,7 @@ impl<Hash: BlockHash> Shared<Hash> { | |
// New subscriptions will be given this init message: | ||
shared.current_init_message = Some(ev.clone()); | ||
// Clear block cache (since a new finalized block hash is seen): | ||
shared.block_events_from_last_finalized.clear(); | ||
shared.block_events_for_new_subscriptions.clear(); | ||
} | ||
FollowStreamMsg::Event(FollowEvent::Finalized(finalized_ev)) => { | ||
// Update the init message that we'll hand out to new subscriptions. If the init message | ||
|
@@ -275,8 +274,26 @@ impl<Hash: BlockHash> Shared<Hash> { | |
} | ||
} | ||
|
||
// New finalized block, so clear the cache of older block events. | ||
shared.block_events_from_last_finalized.clear(); | ||
// The blocks reported by the finalized event should not be reported again | ||
// by the initialized event. The pruned blocks are not of interest. | ||
let to_remove: HashSet<Hash> = finalized_ev | ||
.finalized_block_hashes | ||
.iter() | ||
.chain(finalized_ev.pruned_block_hashes.iter()) | ||
.map(|h| h.hash()) | ||
.collect(); | ||
|
||
shared | ||
.block_events_for_new_subscriptions | ||
.retain(|ev| match ev { | ||
FollowEvent::NewBlock(new_block_ev) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment here why on
|
||
!to_remove.contains(&new_block_ev.block_hash.hash()) | ||
} | ||
FollowEvent::BestBlockChanged(best_block_ev) => { | ||
!to_remove.contains(&best_block_ev.best_block_hash.hash()) | ||
} | ||
_ => true, | ||
}); | ||
} | ||
FollowStreamMsg::Event(FollowEvent::NewBlock(new_block_ev)) => { | ||
// If a new runtime is seen, note it so that when a block is finalized, we | ||
|
@@ -288,15 +305,15 @@ impl<Hash: BlockHash> Shared<Hash> { | |
} | ||
|
||
shared | ||
.block_events_from_last_finalized | ||
.block_events_for_new_subscriptions | ||
.push_back(FollowEvent::NewBlock(new_block_ev)); | ||
} | ||
FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => { | ||
shared.block_events_from_last_finalized.push_back(ev); | ||
shared.block_events_for_new_subscriptions.push_back(ev); | ||
} | ||
FollowStreamMsg::Event(FollowEvent::Stop) => { | ||
// On a stop event, clear everything. Wait for resubscription and new ready/initialised events. | ||
shared.block_events_from_last_finalized.clear(); | ||
shared.block_events_for_new_subscriptions.clear(); | ||
shared.current_subscription_id = None; | ||
shared.current_init_message = None; | ||
} | ||
|
@@ -334,7 +351,7 @@ impl<Hash: BlockHash> Shared<Hash> { | |
init_msg.clone(), | ||
))); | ||
} | ||
for ev in &shared.block_events_from_last_finalized { | ||
for ev in &shared.block_events_for_new_subscriptions { | ||
local_items.push_back(FollowStreamMsg::Event(ev.clone())); | ||
} | ||
|
||
|
@@ -485,4 +502,41 @@ mod test { | |
]; | ||
assert_eq!(evs, expected); | ||
} | ||
|
||
#[tokio::test] | ||
async fn subscribers_receive_new_blocks_before_subscribing() { | ||
let mut driver = test_follow_stream_driver_getter( | ||
|| { | ||
[ | ||
Ok(ev_initialized(0)), | ||
Ok(ev_new_block(0, 1)), | ||
Ok(ev_best_block(1)), | ||
Ok(ev_new_block(1, 2)), | ||
Ok(ev_new_block(2, 3)), | ||
Ok(ev_finalized([1])), | ||
Err(Error::Other("ended".to_owned())), | ||
] | ||
}, | ||
10, | ||
); | ||
|
||
// Skip to the first finalized block F1. | ||
let _r = driver.next().await.unwrap(); | ||
let _i0 = driver.next().await.unwrap(); | ||
let _n1 = driver.next().await.unwrap(); | ||
let _b1 = driver.next().await.unwrap(); | ||
let _n2 = driver.next().await.unwrap(); | ||
let _n3 = driver.next().await.unwrap(); | ||
let _f1 = driver.next().await.unwrap(); | ||
|
||
// THEN subscribe; and make sure new block 1 and 2 are received. | ||
let evs: Vec<_> = driver.handle().subscribe().take(4).collect().await; | ||
let expected = vec![ | ||
FollowStreamMsg::Ready("sub_id_0".into()), | ||
FollowStreamMsg::Event(ev_initialized_ref(1)), | ||
FollowStreamMsg::Event(ev_new_block_ref(1, 2)), | ||
FollowStreamMsg::Event(ev_new_block_ref(2, 3)), | ||
]; | ||
assert_eq!(evs, expected); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, alright when
Initialized
event is generated we can conclude that no messages regarding the next block are missed.