Skip to content

Commit

Permalink
backend: Detect chainHead stream termination and stop polling
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv committed Jan 10, 2024
1 parent b3a4675 commit d9169e2
Showing 1 changed file with 98 additions and 40 deletions.
138 changes: 98 additions & 40 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
let mut mem_log = vec![];
let mut loop_times = 0;

let mut should_poll_blocks = true;

// Now we can attempt to associate tx events with pinned blocks.
let tx_stream = futures::stream::poll_fn(move |cx| {
loop {
Expand All @@ -575,52 +577,108 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
}

// Make a note of new or finalized blocks that have come in since we started the TX.
if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
match seen_block {
SeenBlock::New((block_ref, parent)) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
// if finalized_hash.is_none() {
seen_blocks.insert(
block_ref.hash(),
(
SeenBlockMarker::New,
block_ref,
parent,
Some(now.elapsed()),
None,
),
);
// }
}
SeenBlock::Finalized(block_refs) => {
for block_ref in block_refs {
// if !seen_blocks.contains_key(&block_ref.hash()) {
// panic!("Finalized before new Finalized {:#?}\n initBlock {:#?} \nMEMLOG{:#?}\n SeenBlocks{:#?} \n Other{:#?}", block_ref.hash(), unsafe { &FIN_BLOCK }, mem_log, seen_blocks, seen_other);
// }

let entry = seen_blocks.entry(block_ref.hash()).or_insert((
SeenBlockMarker::Finalized,
block_ref.clone(),
block_ref,
None,
Some(now.elapsed()),
));
entry.0 = SeenBlockMarker::Finalized;
entry.4 = Some(now.elapsed());

// .get_mut(&block_ref.hash())
// .expect("finalized block seen before new block")
// .0 = SeenBlockMarker::Finalized;
if should_poll_blocks {
match seen_blocks_sub.poll_next_unpin(cx) {
Poll::Ready(Some(seen_block)) => {
match seen_block {
SeenBlock::New((block_ref, parent)) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
// if finalized_hash.is_none() {
seen_blocks.insert(
block_ref.hash(),
(
SeenBlockMarker::New,
block_ref,
parent,
Some(now.elapsed()),
None,
),
);
// }
}
SeenBlock::Finalized(block_refs) => {
for block_ref in block_refs {
// if !seen_blocks.contains_key(&block_ref.hash()) {
// panic!("Finalized before new Finalized {:#?}\n initBlock {:#?} \nMEMLOG{:#?}\n SeenBlocks{:#?} \n Other{:#?}", block_ref.hash(), unsafe { &FIN_BLOCK }, mem_log, seen_blocks, seen_other);
// }

let entry =
seen_blocks.entry(block_ref.hash()).or_insert((
SeenBlockMarker::Finalized,
block_ref.clone(),
block_ref,
None,
Some(now.elapsed()),
));
entry.0 = SeenBlockMarker::Finalized;
entry.4 = Some(now.elapsed());

// .get_mut(&block_ref.hash())
// .expect("finalized block seen before new block")
// .0 = SeenBlockMarker::Finalized;
}
}
SeenBlock::Other(other) => {
seen_other.push((now.elapsed(), other));
}
}
continue;
}
SeenBlock::Other(other) => {
seen_other.push((now.elapsed(), other));
Poll::Ready(None) => {
should_poll_blocks = false;
println!(" seen_blocks_sub ended!");
}
Poll::Pending => (),
}
continue;
}

// if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
// match seen_block {
// SeenBlock::New((block_ref, parent)) => {
// // Optimization: once we have a `finalized_hash`, we only care about finalized
// // block refs now and can avoid bothering to save new blocks.
// // if finalized_hash.is_none() {
// seen_blocks.insert(
// block_ref.hash(),
// (
// SeenBlockMarker::New,
// block_ref,
// parent,
// Some(now.elapsed()),
// None,
// ),
// );
// // }
// }
// SeenBlock::Finalized(block_refs) => {
// for block_ref in block_refs {
// // if !seen_blocks.contains_key(&block_ref.hash()) {
// // panic!("Finalized before new Finalized {:#?}\n initBlock {:#?} \nMEMLOG{:#?}\n SeenBlocks{:#?} \n Other{:#?}", block_ref.hash(), unsafe { &FIN_BLOCK }, mem_log, seen_blocks, seen_other);
// // }

// let entry = seen_blocks.entry(block_ref.hash()).or_insert((
// SeenBlockMarker::Finalized,
// block_ref.clone(),
// block_ref,
// None,
// Some(now.elapsed()),
// ));
// entry.0 = SeenBlockMarker::Finalized;
// entry.4 = Some(now.elapsed());

// // .get_mut(&block_ref.hash())
// // .expect("finalized block seen before new block")
// // .0 = SeenBlockMarker::Finalized;
// }
// }
// SeenBlock::Other(other) => {
// seen_other.push((now.elapsed(), other));
// }
// }
// continue;
// }

// If we have a finalized hash, we are done looking for tx events and we are just waiting
// for a pinned block with a matching hash (which must appear eventually given it's finalized).
if let Some(hash) = &finalized_hash {
Expand Down

0 comments on commit d9169e2

Please sign in to comment.