Skip to content

Commit

Permalink
fix: address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
KolbyML committed Aug 24, 2024
1 parent 4f581fd commit 2ce7d59
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 138 deletions.
86 changes: 66 additions & 20 deletions e2store/src/era.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,28 @@ impl Era {
let slot_index_state = SlotIndexStateEntry::try_from(&file.entries[entries_length - 1])?;

// Iterate over the block entries. Skip the first and last 3 entries.
let mut next_slot = slot_index_block.slot_index.starting_slot;
for idx in 1..entries_length - 3 {
let entry = &file.entries[idx];
let fork = get_beacon_fork(next_slot);
let slot_indexes = slot_index_block
.slot_index
.indices
.iter()
.enumerate()
.filter_map(|(i, index)| {
if *index != 0 {
Some(slot_index_block.slot_index.starting_slot + i as u64)
} else {
None
}
})
.collect::<Vec<u64>>();

ensure!(
slot_indexes.len() == entries_length - 4,
"invalid slot index block: incorrect count"
);
for (index, slot) in slot_indexes.into_iter().enumerate() {
let entry = &file.entries[index + 1];
let fork = get_beacon_fork(slot);
let beacon_block = CompressedSignedBeaconBlock::try_from(entry, fork)?;
next_slot = beacon_block.block.slot() + 1;
blocks.push(beacon_block);
}
let fork = get_beacon_fork(slot_index_state.slot_index.starting_slot);
Expand All @@ -81,22 +97,41 @@ impl Era {
}

/// Iterate over beacon blocks.
pub fn iter_blocks(raw_era1: Vec<u8>) -> impl Iterator<Item = CompressedSignedBeaconBlock> {
let file = E2StoreMemory::deserialize(&raw_era1).expect("invalid era1 file");
pub fn iter_blocks(
raw_era: Vec<u8>,
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<CompressedSignedBeaconBlock>>> {
let file = E2StoreMemory::deserialize(&raw_era)?;
let entries_length = file.entries.len();
let block_index = SlotIndexBlockEntry::try_from(&file.entries[entries_length - 2])
.expect("missing block index entry")
.slot_index;

let mut next_slot = block_index.starting_slot;
(1..entries_length - 3).map(move |idx| {
let entry: Entry = file.entries[idx].clone();
let fork = get_beacon_fork(next_slot);
let beacon_block =
CompressedSignedBeaconBlock::try_from(&entry, fork).expect("invalid block");
next_slot = beacon_block.block.slot() + 1;
beacon_block
})
let block_index =
SlotIndexBlockEntry::try_from(&file.entries[entries_length - 2])?.slot_index;

let slot_indexes = block_index
.indices
.iter()
.enumerate()
.filter_map(|(i, index)| {
if *index != 0 {
Some(block_index.starting_slot + i as u64)
} else {
None
}
})
.collect::<Vec<u64>>();

ensure!(
slot_indexes.len() == entries_length - 4,
"invalid slot index block: incorrect count"
);

Ok(slot_indexes
.into_iter()
.enumerate()
.map(move |(index, slot)| {
let entry: Entry = file.entries[index + 1].clone();
let fork = get_beacon_fork(slot);
let beacon_block = CompressedSignedBeaconBlock::try_from(&entry, fork)?;
Ok(beacon_block)
}))
}

#[allow(dead_code)]
Expand All @@ -121,6 +156,17 @@ impl Era {
file.write(&mut buf)?;
Ok(buf)
}

pub fn contains(&self, block_number: u64) -> bool {
if self.blocks.is_empty() {
return false;
}
let first_block_number = self.blocks[0].block.execution_block_number();
let last_block_number = self.blocks[self.blocks.len() - 1]
.block
.execution_block_number();
(first_block_number..=last_block_number).contains(&block_number)
}
}

#[derive(Clone, PartialEq, Debug)]
Expand Down
18 changes: 13 additions & 5 deletions e2store/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::io;
use std::{collections::HashMap, io};

use anyhow::{anyhow, ensure, Error};
use rand::{seq::SliceRandom, thread_rng};
use scraper::{Html, Selector};
use surf::Client;

const ERA1_DIR_URL: &str = "https://era1.ethportal.net/";
const ERA1_FILE_COUNT: usize = 1897;
pub const ERA1_FILE_COUNT: usize = 1897;

/// Fetches era1 files hosted on era1.ethportal.net and shuffles them
pub async fn get_shuffled_era1_files(http_client: &Client) -> anyhow::Result<Vec<String>> {
Expand Down Expand Up @@ -51,22 +51,30 @@ pub fn underlying_io_error_kind(error: &Error) -> Option<io::ErrorKind> {
const ERA_DIR_URL: &str = "https://mainnet.era.nimbus.team/";

/// Fetches era file download links
pub async fn get_era_file_download_links(http_client: &Client) -> anyhow::Result<Vec<String>> {
pub async fn get_era_file_download_links(
http_client: &Client,
) -> anyhow::Result<HashMap<u64, String>> {
let index_html = http_client
.get(ERA_DIR_URL)
.recv_string()
.await
.map_err(|e| anyhow!("{e}"))?;
let index_html = Html::parse_document(&index_html);
let selector = Selector::parse("a[href*='mainnet-']").expect("to be able to parse selector");
let era_files: Vec<String> = index_html
let era_files: HashMap<u64, String> = index_html
.select(&selector)
.map(|element| {
let href = element
.value()
.attr("href")
.expect("to be able to get href");
format!("{ERA_DIR_URL}{href}")
let epoch_index = href
.split('-')
.nth(1)
.expect("to be able to get epoch")
.parse::<u64>()
.expect("to be able to parse epoch");
(epoch_index, format!("{ERA_DIR_URL}{href}"))
})
.collect();
Ok(era_files)
Expand Down
68 changes: 41 additions & 27 deletions trin-execution/src/era/beacon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use alloy_rlp::Decodable;
use ethportal_api::{
consensus::{
beacon_block::{
SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, SignedBeaconBlockDeneb,
SignedBeaconBlock, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella,
SignedBeaconBlockDeneb,
},
body::Transactions,
},
Expand All @@ -18,12 +19,22 @@ use super::types::{ProcessedBlock, TransactionsWithSender};
const EMPTY_UNCLE_ROOT_HASH: B256 =
b256!("1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347");

pub trait BeaconBody {
fn process_beacon_block(&self) -> ProcessedBlock;
pub trait ProcessBeaconBlock {
fn process_beacon_block(&self) -> anyhow::Result<ProcessedBlock>;
}

impl BeaconBody for SignedBeaconBlockBellatrix {
fn process_beacon_block(&self) -> ProcessedBlock {
impl ProcessBeaconBlock for SignedBeaconBlock {
fn process_beacon_block(&self) -> anyhow::Result<ProcessedBlock> {
match self {
SignedBeaconBlock::Bellatrix(block) => block.process_beacon_block(),
SignedBeaconBlock::Capella(block) => block.process_beacon_block(),
SignedBeaconBlock::Deneb(block) => block.process_beacon_block(),
}
}
}

impl ProcessBeaconBlock for SignedBeaconBlockBellatrix {
fn process_beacon_block(&self) -> anyhow::Result<ProcessedBlock> {
let payload = &self.message.body.execution_payload;
let header = Header {
parent_hash: payload.parent_hash,
Expand All @@ -49,16 +60,16 @@ impl BeaconBody for SignedBeaconBlockBellatrix {
parent_beacon_block_root: None,
};

ProcessedBlock {
Ok(ProcessedBlock {
header: header.clone(),
uncles: None,
transactions: process_transactions(&payload.transactions),
}
transactions: process_transactions(&payload.transactions)?,
})
}
}

impl BeaconBody for SignedBeaconBlockCapella {
fn process_beacon_block(&self) -> ProcessedBlock {
impl ProcessBeaconBlock for SignedBeaconBlockCapella {
fn process_beacon_block(&self) -> anyhow::Result<ProcessedBlock> {
let payload = &self.message.body.execution_payload;
let header = Header {
parent_hash: payload.parent_hash,
Expand All @@ -84,16 +95,16 @@ impl BeaconBody for SignedBeaconBlockCapella {
parent_beacon_block_root: None,
};

ProcessedBlock {
Ok(ProcessedBlock {
header: header.clone(),
uncles: None,
transactions: process_transactions(&payload.transactions),
}
transactions: process_transactions(&payload.transactions)?,
})
}
}

impl BeaconBody for SignedBeaconBlockDeneb {
fn process_beacon_block(&self) -> ProcessedBlock {
impl ProcessBeaconBlock for SignedBeaconBlockDeneb {
fn process_beacon_block(&self) -> anyhow::Result<ProcessedBlock> {
let payload = &self.message.body.execution_payload;
let header = Header {
parent_hash: payload.parent_hash,
Expand All @@ -119,26 +130,29 @@ impl BeaconBody for SignedBeaconBlockDeneb {
parent_beacon_block_root: None,
};

ProcessedBlock {
Ok(ProcessedBlock {
header: header.clone(),
uncles: None,
transactions: process_transactions(&payload.transactions),
}
transactions: process_transactions(&payload.transactions)?,
})
}
}

fn process_transactions(transactions: &Transactions) -> Vec<TransactionsWithSender> {
fn process_transactions(
transactions: &Transactions,
) -> anyhow::Result<Vec<TransactionsWithSender>> {
transactions
.into_par_iter()
.map(|raw_tx| {
Transaction::decode_enveloped_transactions(&mut raw_tx.to_vec().as_slice())
.expect("We should always be able to decode the enveloped transactions of a block")
})
.map(|tx| TransactionsWithSender {
sender_address: tx
let transaction =
Transaction::decode_enveloped_transactions(&mut raw_tx.to_vec().as_slice())
.map_err(|err| anyhow::anyhow!("Failed decoding transaction rlp: {err:?}"))?;
transaction
.get_transaction_sender_address()
.expect("We should always be able to get the sender address of a transaction"),
transaction: tx,
.map(|sender_address| TransactionsWithSender {
sender_address,
transaction,
})
})
.collect()
.collect::<anyhow::Result<Vec<_>>>()
}
63 changes: 17 additions & 46 deletions trin-execution/src/era/binary_search.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use e2store::{
e2store::types::{Entry, Header as E2StoreHeader},
era::{get_beacon_fork, CompressedSignedBeaconBlock, Era, SlotIndexStateEntry},
era::{get_beacon_fork, CompressedSignedBeaconBlock, Era},
utils::get_era_file_download_links,
};
use surf::Client;
Expand All @@ -26,26 +26,29 @@ impl EraBinarySearch {
block_number: u64,
) -> anyhow::Result<ProcessedEra> {
let era_links = get_era_file_download_links(&http_client).await?;
let last_epoch_index = era_links.len() - 1;
let mut start_epoch_index = FIRST_ERA_EPOCH_WITH_EXECUTION_PAYLOAD;
let mut end_epoch_index = era_links[era_links.len() - 1]
.split('-')
.nth(1)
.expect("to be able to get epoch index")
.parse::<u64>()
.expect("to be able to parse epoch index");
let mut end_epoch_index =
*era_links.keys().max().expect("Getting max shouldn't fail") as u64;

while start_epoch_index <= end_epoch_index {
let mid = end_epoch_index + (start_epoch_index - end_epoch_index) / 2;
let mid_block = EraBinarySearch::download_first_beacon_block_from_era(
era_links[mid as usize].clone(),
mid,
era_links[&mid].clone(),
http_client.clone(),
)
.await?;
let mid_block_number = mid_block.block.execution_block_number();

if mid + 1 > last_epoch_index as u64 {
return Err(anyhow::anyhow!("Block not found in any era file"));
}

// minimize the worse case by fetching the first block number of the after mid .era
let mid_plus_one_block = EraBinarySearch::download_first_beacon_block_from_era(
era_links[mid as usize + 1].clone(),
mid + 1,
era_links[&(mid + 1)].clone(),
http_client.clone(),
)
.await?;
Expand All @@ -59,16 +62,12 @@ impl EraBinarySearch {
|| (mid_block_number..mid_plus_one_block_number).contains(&block_number)
{
let era_to_check =
download_raw_era(era_links[mid as usize].clone(), http_client.clone()).await?;
download_raw_era(era_links[&(mid)].clone(), http_client.clone()).await?;

let decoded_era = Era::deserialize(&era_to_check)?;
if EraBinarySearch::does_era_contain_block(&decoded_era, block_number) {
if decoded_era.contains(block_number) {
return process_era_file(era_to_check, mid);
}

if mid + 1 > end_epoch_index {
return Err(anyhow::anyhow!("Block not found in any era file"));
}
}

if mid_block_number < block_number {
Expand All @@ -81,16 +80,8 @@ impl EraBinarySearch {
Err(anyhow::anyhow!("Block not found in any era file"))
}

fn does_era_contain_block(era: &Era, block_number: u64) -> bool {
for block in &era.blocks {
if block.block.execution_block_number() == block_number {
return true;
}
}
false
}

async fn download_first_beacon_block_from_era(
era_index: u64,
era_path: String,
http_client: Client,
) -> anyhow::Result<CompressedSignedBeaconBlock> {
Expand All @@ -110,28 +101,8 @@ impl EraBinarySearch {
.expect("to be able to download compressed beacon block");
let entry = Entry::deserialize(&compressed_beacon_block)?;

// download slot_index_state to get the starting slot so we get the correct fork
let response = surf::head(&era_path).await.expect("to be able to get head");
let content_length = response
.header("Content-Length")
.map(|lengths| lengths.get(0))
.expect("to be able to get content length")
.map(|length| length.as_str().parse::<u64>())
.expect("to be able to parse content length")
.expect("to be able to get content length");
let slot_index_state = http_client
.get(&era_path)
.header(
"Range",
format!("bytes={}-{}", content_length - 32, content_length),
)
.recv_bytes()
.await
.expect("to be able to download compressed beacon block");

let slot_index_state = Entry::deserialize(&slot_index_state)?;
let slot_index_state = SlotIndexStateEntry::try_from(&slot_index_state)?;
let fork = get_beacon_fork(slot_index_state.slot_index.starting_slot);
let slot_index = (era_index - 1) * 8192;
let fork = get_beacon_fork(slot_index);

let beacon_block = CompressedSignedBeaconBlock::try_from(&entry, fork)?;

Expand Down
Loading

0 comments on commit 2ce7d59

Please sign in to comment.