Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend(fix): Remove only finalized blocks from the event window #1356

Merged
merged 4 commits into from
Jan 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 70 additions & 12 deletions subxt/src/backend/unstable/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEve
use crate::config::BlockHash;
use crate::error::Error;
use futures::stream::{Stream, StreamExt};
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -165,9 +165,8 @@ struct SharedState<Hash: BlockHash> {
done: bool,
next_id: usize,
subscribers: HashMap<usize, SubscriberDetails<Hash>>,
// Keep a buffer of all events from last finalized block so that new
// subscriptions can be handed this info first.
block_events_from_last_finalized: VecDeque<FollowEvent<BlockRef<Hash>>>,
/// Keep a buffer of all events that should be handed to a new subscription.
block_events_for_new_subscriptions: VecDeque<FollowEvent<BlockRef<Hash>>>,
// Keep track of the subscription ID we send out on new subs.
current_subscription_id: Option<String>,
// Keep track of the init message we send out on new subs.
Expand All @@ -186,7 +185,7 @@ impl<Hash: BlockHash> Default for Shared<Hash> {
current_init_message: None,
current_subscription_id: None,
seen_runtime_events: HashMap::new(),
block_events_from_last_finalized: VecDeque::new(),
block_events_for_new_subscriptions: VecDeque::new(),
})))
}
}
Expand Down Expand Up @@ -251,7 +250,7 @@ impl<Hash: BlockHash> Shared<Hash> {
// New subscriptions will be given this init message:
shared.current_init_message = Some(ev.clone());
// Clear block cache (since a new finalized block hash is seen):
shared.block_events_from_last_finalized.clear();
shared.block_events_for_new_subscriptions.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, alright when Initialized event is generated we can conclude that no messages regarding the next block are missed.

}
FollowStreamMsg::Event(FollowEvent::Finalized(finalized_ev)) => {
// Update the init message that we'll hand out to new subscriptions. If the init message
Expand All @@ -275,8 +274,30 @@ impl<Hash: BlockHash> Shared<Hash> {
}
}

// New finalized block, so clear the cache of older block events.
shared.block_events_from_last_finalized.clear();
// The last finalized block will be reported as Initialized by our driver,
// therefore there is no need to report NewBlock and BestBlock events for it.
// If the Finalized event reported multiple finalized hashes, we only care about
// the state at the head of the chain, therefore it is correct to remove those as well.
// Idem for the pruned hashes; they will never be reported again and we remove
// them from the window of events.
let to_remove: HashSet<Hash> = finalized_ev
.finalized_block_hashes
.iter()
.chain(finalized_ev.pruned_block_hashes.iter())
.map(|h| h.hash())
.collect();

shared
.block_events_for_new_subscriptions
.retain(|ev| match ev {
FollowEvent::NewBlock(new_block_ev) => {
Copy link
Member

@niklasad1 niklasad1 Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here why on NewBlock and BestBlockChanged are removed here if it's pruned or finalized?

The last finalized block will be reported as Initialized by our driver, therefore there is no need to report NewBlock and BestBlock events for it. If the Finalized event reported multiple finalized hashes, we only care about the state at the head of the chain, therefore it is correct to remove those as well. Idem for the pruned hashes; they will never be reported again and we remove them from the window of events.

!to_remove.contains(&new_block_ev.block_hash.hash())
}
FollowEvent::BestBlockChanged(best_block_ev) => {
!to_remove.contains(&best_block_ev.best_block_hash.hash())
}
_ => true,
});
}
FollowStreamMsg::Event(FollowEvent::NewBlock(new_block_ev)) => {
// If a new runtime is seen, note it so that when a block is finalized, we
Expand All @@ -288,15 +309,15 @@ impl<Hash: BlockHash> Shared<Hash> {
}

shared
.block_events_from_last_finalized
.block_events_for_new_subscriptions
.push_back(FollowEvent::NewBlock(new_block_ev));
}
FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => {
shared.block_events_from_last_finalized.push_back(ev);
shared.block_events_for_new_subscriptions.push_back(ev);
}
FollowStreamMsg::Event(FollowEvent::Stop) => {
// On a stop event, clear everything. Wait for resubscription and new ready/initialised events.
shared.block_events_from_last_finalized.clear();
shared.block_events_for_new_subscriptions.clear();
shared.current_subscription_id = None;
shared.current_init_message = None;
}
Expand Down Expand Up @@ -334,7 +355,7 @@ impl<Hash: BlockHash> Shared<Hash> {
init_msg.clone(),
)));
}
for ev in &shared.block_events_from_last_finalized {
for ev in &shared.block_events_for_new_subscriptions {
local_items.push_back(FollowStreamMsg::Event(ev.clone()));
}

Expand Down Expand Up @@ -485,4 +506,41 @@ mod test {
];
assert_eq!(evs, expected);
}

#[tokio::test]
async fn subscribers_receive_new_blocks_before_subscribing() {
let mut driver = test_follow_stream_driver_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Ok(ev_finalized([1])),
Err(Error::Other("ended".to_owned())),
]
},
10,
);

// Skip to the first finalized block F1.
let _r = driver.next().await.unwrap();
let _i0 = driver.next().await.unwrap();
let _n1 = driver.next().await.unwrap();
let _b1 = driver.next().await.unwrap();
let _n2 = driver.next().await.unwrap();
let _n3 = driver.next().await.unwrap();
let _f1 = driver.next().await.unwrap();

// THEN subscribe; and make sure new block 1 and 2 are received.
let evs: Vec<_> = driver.handle().subscribe().take(4).collect().await;
let expected = vec![
FollowStreamMsg::Ready("sub_id_0".into()),
FollowStreamMsg::Event(ev_initialized_ref(1)),
FollowStreamMsg::Event(ev_new_block_ref(1, 2)),
FollowStreamMsg::Event(ev_new_block_ref(2, 3)),
];
assert_eq!(evs, expected);
}
}
Loading