Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sidecar fetcher #7443

Merged
merged 38 commits into from
May 28, 2024
Merged
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c7a13ac
example project base
JackG-eth Apr 2, 2024
6f834a0
WIP Message Types
JackG-eth Apr 3, 2024
e5e2352
error type
JackG-eth Apr 3, 2024
7a5502b
WIP
JackG-eth Apr 5, 2024
b89b692
stream type filled out
JackG-eth Apr 5, 2024
c9f2ca7
item variants
JackG-eth Apr 6, 2024
378a0c5
grab blob from pool
JackG-eth Apr 6, 2024
90860bc
bump v
JackG-eth Apr 17, 2024
750ddc7
bump, pending
JackG-eth Apr 19, 2024
3328c75
just blob decode left
JackG-eth Apr 19, 2024
68f3000
prototype1
JackG-eth Apr 19, 2024
7651a0f
cleanup
JackG-eth Apr 19, 2024
e6299c4
bt changes
JackG-eth Apr 24, 2024
e9bdda3
improved blobt logic
JackG-eth Apr 24, 2024
2e4a59d
Cleanup url
JackG-eth Apr 26, 2024
a4ed6c6
cleanup
JackG-eth Apr 26, 2024
985e93a
comments
JackG-eth Apr 26, 2024
ea335f7
remove unused deps
JackG-eth Apr 26, 2024
cc268d7
cargo logic
JackG-eth Apr 26, 2024
8b95b42
beacon types
JackG-eth Apr 30, 2024
0167a8e
json example
JackG-eth May 1, 2024
4b8842d
WIP sidecar iterator
JackG-eth May 10, 2024
3120d83
removing types as moved to beacon
JackG-eth May 10, 2024
8537f9f
example project base
JackG-eth Apr 2, 2024
921fbc8
rebase with main
JackG-eth May 10, 2024
c9a8198
update tomls
JackG-eth May 10, 2024
988e4a1
Merge branch 'paradigmxyz:main' into feature_sidecar_fetcher
JackG-eth May 12, 2024
fd5be2e
updating to latest
JackG-eth May 12, 2024
9b60ef3
WIP, Testing
JackG-eth May 12, 2024
8da1c94
sidecar tested
JackG-eth May 16, 2024
a17d01c
Merge branch 'paradigmxyz:main' into feature_sidecar_fetcher
JackG-eth May 16, 2024
8c07755
wip reorged
JackG-eth May 27, 2024
f890dee
adding changes
JackG-eth May 27, 2024
17d9112
nits
JackG-eth May 27, 2024
6d7c4d1
touchups
mattsse May 28, 2024
d055770
Merge branch 'main' into feature_sidecar_fetcher
mattsse May 28, 2024
2c38d90
use workspace reqwest
mattsse May 28, 2024
dd8e2f5
fix docs
mattsse May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 51 additions & 69 deletions examples/beacon-api-sidecar-fetcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@

use std::{
collections::VecDeque,
net::{IpAddr, Ipv4Addr},
pin::Pin,
task::{Context, Poll},
};

use clap::Parser;
use thiserror::Error;

use futures_util::{stream::FuturesUnordered, Future, Stream, StreamExt};
use reqwest::{Error, StatusCode};
use reth::{
primitives::{alloy_primitives::TxHash, BlobTransactionSidecar, BlockHash, BlockNumber},
primitives::BlobTransaction,
providers::CanonStateNotification,
rpc::types::engine::BlobsBundleV1,
transaction_pool::{BlobStoreError, TransactionPoolExt},
Expand Down Expand Up @@ -52,16 +54,6 @@ pub enum SideCarError {
#[error("{0} Error: {1}")]
UnknownError(u16, String),
}

#[derive(Debug, Clone)]
pub struct MinedSidecar {
tx_hash: TxHash,
block_hash: BlockHash,
block_number: BlockNumber,
sidecar: BlobTransactionSidecar,
}

#[derive(Debug)]
pub struct MinedSidecarStream<St, P>
where
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
Expand All @@ -70,88 +62,80 @@ where
pool: P,
client: reqwest::Client,
pending_requests:
FuturesUnordered<Pin<Box<dyn Future<Output = Result<Vec<MinedSidecar>, SideCarError>>>>>, /* TODO make vec */
queued_actions: VecDeque<MinedSidecar>, /* Buffer for
* ready items */
FuturesUnordered<Pin<Box<dyn Future<Output = Result<Vec<BlobTransaction>, SideCarError>>>>>, /* TODO make vec */
queued_actions: VecDeque<BlobTransaction>, /* Buffer for
* ready items */
}

impl<St, P> Stream for MinedSidecarStream<St, P>
where
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
P: TransactionPoolExt + Unpin + 'static,
{
type Item = Result<MinedSidecar, SideCarError>;
type Item = Result<BlobTransaction, SideCarError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this: &mut MinedSidecarStream<St, P> = self.get_mut();

// return any buffered result
if let Some(mined_sidecar) = this.queued_actions.pop_front() {
return Poll::Ready(Some(Ok(mined_sidecar)));
}

// Check if any pending reqwests are ready and append to buffer
while let Poll::Ready(Some(pending_result)) = this.pending_requests.poll_next_unpin(cx) {
match pending_result {
Ok(mined_sidecars) => {
for sidecar in mined_sidecars {
this.queued_actions.push_back(sidecar);
}

// Try returning the next available sidecar if any
if let Some(sidecar) = this.queued_actions.pop_front() {
return Poll::Ready(Some(Ok(sidecar)));
// return any buffered result
loop {
// Check if any pending reqwests are ready and append to buffer
while let Poll::Ready(Some(pending_result)) = this.pending_requests.poll_next_unpin(cx)
{
match pending_result {
Ok(mined_sidecars) => {
for sidecar in mined_sidecars {
this.queued_actions.push_back(sidecar);
}
}
Err(err) => return Poll::Ready(Some(Err(err))),
}
Err(err) => return Poll::Ready(Some(Err(err))),
}
}

loop {
match this.events.poll_next_unpin(cx) {
Poll::Ready(Some(notification)) => {
// todo: Clea up all blobs avail logic
// todo: update error for transcations
// todo: CL connectivty logic
while let Poll::Ready(Some(notification)) = this.events.poll_next_unpin(cx) {
{
let notification_clone = notification.clone();
let mut all_blobs_available = true;
let mut actions_to_queue: Vec<MinedSidecar> = Vec::new();
let mut actions_to_queue: Vec<BlobTransaction> = Vec::new();

let txs: Vec<_> = notification
.tip()
.transactions()
.filter(|tx| tx.is_eip4844())
.map(|tx| {
// Clone minimal data necessary for later processing
(tx.hash, tx.blob_versioned_hashes().unwrap().len())
})
.map(|tx| (tx.clone(), tx.blob_versioned_hashes().unwrap().len()))
.collect();

for (tx_hash, _) in &txs {
match this.pool.get_blob(*tx_hash) {
Ok(Some(blob)) => {
actions_to_queue.push(MinedSidecar {
tx_hash: *tx_hash,
block_hash: notification.tip().block.hash(),
block_number: notification.tip().block.number,
sidecar: blob,
});
}
Ok(None) => {
all_blobs_available = false;
break;
}
Err(err) => {
return Poll::Ready(Some(Err(SideCarError::TransactionPoolError(
err,
))))
}
//returns
let sidecars = match this
.pool
.get_all_blobs_exact(txs.iter().map(|(tx, _)| tx.hash).collect())
{
Ok(blobs) => blobs,
Err(err) => {
all_blobs_available = false;
return Poll::Ready(Some(Err(SideCarError::TransactionPoolError(err))))
}
};

for ((tx, _), sidecar) in txs.iter().zip(sidecars.iter()) {
match BlobTransaction::try_from_signed(tx.clone(), sidecar.clone()) {
Ok(blob_transaction) => actions_to_queue.push(blob_transaction),
Err((transaction, sidecar)) => {
todo!()
}
};
}

if all_blobs_available {
this.queued_actions.extend(actions_to_queue);
} else {
let client_clone = this.client.clone();
let block_hash = notification.tip().block.hash();
let block_number = notification.tip().block.number;

let url = format!(
"http://{}/eth/v1/beacon/blob_sidecars/{}",
Expand Down Expand Up @@ -206,16 +190,16 @@ where
}
};

let sidecars: Vec<MinedSidecar> = txs
let sidecars: Vec<BlobTransaction> = txs
.iter()
.map(|(tx_hash, blob_len)| {
// Reuse txs without moving it
.map(|(tx, blob_len)| {
let sidecar = blobs_bundle.pop_sidecar(*blob_len);
MinedSidecar {
tx_hash: *tx_hash,
block_hash,
block_number,
sidecar: sidecar.into(),
match BlobTransaction::try_from_signed(
tx.clone(),
sidecar.into(),
) {
Ok(blob_transaction) => blob_transaction,
Err((transaction, sidecar)) => todo!(),
}
})
.collect();
Expand All @@ -227,8 +211,6 @@ where
this.pending_requests.push(query);
}
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => continue,
}
}
}
Expand Down