From e935b1862f6c11cfbd3aa4e43ab09b9e092f4a18 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Tue, 29 Aug 2023 08:53:28 +0200 Subject: [PATCH 1/2] Move block stream logic into `ethrpc` --- Cargo.lock | 6 + crates/autopilot/Cargo.toml | 1 + .../database/ethflow_events/event_storing.rs | 3 +- crates/autopilot/src/database/events.rs | 3 +- .../src/database/onchain_order_events.rs | 6 +- .../onchain_order_events/ethflow_events.rs | 8 +- crates/autopilot/src/event_updater.rs | 2 +- crates/autopilot/src/lib.rs | 2 +- .../src/on_settlement_event_updater.rs | 10 +- crates/autopilot/src/run_loop.rs | 8 +- crates/autopilot/src/solvable_orders.rs | 2 +- .../src/boundary/liquidity/balancer/v2/mod.rs | 2 +- crates/driver/src/boundary/liquidity/mod.rs | 3 +- crates/driver/src/boundary/liquidity/swapr.rs | 7 +- .../src/boundary/liquidity/uniswap/v2.rs | 6 +- .../src/boundary/liquidity/uniswap/v3.rs | 2 +- crates/e2e/Cargo.toml | 1 + crates/e2e/tests/e2e/eth_flow.rs | 7 +- crates/e2e/tests/e2e/refunder.rs | 2 +- crates/ethrpc/Cargo.toml | 3 + crates/ethrpc/src/current_block/mod.rs | 321 ++++++++++++++++ crates/ethrpc/src/current_block/retriever.rs | 130 +++++++ crates/ethrpc/src/lib.rs | 1 + crates/refunder/Cargo.toml | 1 + crates/refunder/src/refund_service.rs | 8 +- .../shared/src/account_balances/arguments.rs | 3 +- crates/shared/src/account_balances/cached.rs | 9 +- crates/shared/src/arguments.rs | 6 +- crates/shared/src/contracts.rs | 8 +- crates/shared/src/current_block.rs | 358 +++--------------- crates/shared/src/current_block/arguments.rs | 55 --- crates/shared/src/event_handling.rs | 15 +- crates/shared/src/maintenance.rs | 2 +- crates/shared/src/price_estimation/http.rs | 3 +- crates/shared/src/recent_block_cache.rs | 16 +- .../src/sources/balancer_v2/pool_fetching.rs | 5 +- .../balancer_v2/pool_fetching/cache.rs | 2 +- .../balancer_v2/pool_fetching/pool_storage.rs | 2 +- .../balancer_v2/pool_fetching/registry.rs | 9 +- .../src/sources/uniswap_v2/pool_cache.rs | 2 +- .../src/sources/uniswap_v3/event_fetching.rs | 7 +- .../src/sources/uniswap_v3/pool_fetching.rs | 6 +- crates/solver/src/driver.rs | 3 +- 43 files changed, 596 insertions(+), 460 deletions(-) create mode 100644 crates/ethrpc/src/current_block/mod.rs create mode 100644 crates/ethrpc/src/current_block/retriever.rs diff --git a/Cargo.lock b/Cargo.lock index c91c2d171c..51347168b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -254,6 +254,7 @@ dependencies = [ "database", "derivative", "ethcontract", + "ethrpc", "futures", "gas-estimation", "hex", @@ -1684,6 +1685,7 @@ dependencies = [ "database", "driver", "ethcontract", + "ethrpc", "futures", "hex", "hex-literal", @@ -1895,6 +1897,7 @@ dependencies = [ name = "ethrpc" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", "contracts", "ethcontract", @@ -1906,6 +1909,7 @@ dependencies = [ "maplit", "mockall", "observe", + "primitive-types", "prometheus", "prometheus-metric-storage", "reqwest", @@ -1914,6 +1918,7 @@ dependencies = [ "serde_json", "testlib", "tokio", + "tokio-stream", "tracing", "web3", ] @@ -3480,6 +3485,7 @@ dependencies = [ "contracts", "database", "ethcontract", + "ethrpc", "futures", "gas-estimation", "lazy_static", diff --git a/crates/autopilot/Cargo.toml b/crates/autopilot/Cargo.toml index aa5e9fb276..5134c718a9 100644 --- a/crates/autopilot/Cargo.toml +++ b/crates/autopilot/Cargo.toml @@ -25,6 +25,7 @@ contracts = { path = "../contracts" } database = { path = "../database" } derivative = { workspace = true } ethcontract = { workspace = true } +ethrpc = { path = "../ethrpc" } futures = { workspace = true } gas-estimation = { workspace = true } observe = { path = "../observe" } diff --git a/crates/autopilot/src/database/ethflow_events/event_storing.rs b/crates/autopilot/src/database/ethflow_events/event_storing.rs index 71e303a7db..6031d4d4ad 100644 --- a/crates/autopilot/src/database/ethflow_events/event_storing.rs +++ b/crates/autopilot/src/database/ethflow_events/event_storing.rs @@ -4,7 +4,8 @@ use { crate::database::{events::bytes_to_order_uid, Postgres}, anyhow::Result, database::ethflow_orders::Refund, - shared::{current_block::RangeInclusive, event_handling::EventStoring}, + ethrpc::current_block::RangeInclusive, + shared::event_handling::EventStoring, }; fn get_refunds(events: Vec>) -> Result> { diff --git a/crates/autopilot/src/database/events.rs b/crates/autopilot/src/database/events.rs index 466ba24fc2..b87cb2734a 100644 --- a/crates/autopilot/src/database/events.rs +++ b/crates/autopilot/src/database/events.rs @@ -16,8 +16,9 @@ use { OrderUid, }, ethcontract::{Event as EthContractEvent, EventMetadata}, + ethrpc::current_block::RangeInclusive, number_conversions::u256_to_big_decimal, - shared::{current_block::RangeInclusive, event_handling::EventStoring}, + shared::event_handling::EventStoring, std::convert::TryInto, }; diff --git a/crates/autopilot/src/database/onchain_order_events.rs b/crates/autopilot/src/database/onchain_order_events.rs index 6b8a476ea5..27994980b7 100644 --- a/crates/autopilot/src/database/onchain_order_events.rs +++ b/crates/autopilot/src/database/onchain_order_events.rs @@ -21,6 +21,10 @@ use { PgTransaction, }, ethcontract::{Event as EthContractEvent, H160}, + ethrpc::{ + current_block::{timestamp_of_block_in_seconds, RangeInclusive}, + Web3, + }, futures::{stream, StreamExt}, itertools::multiunzip, model::{ @@ -31,14 +35,12 @@ use { }, number_conversions::u256_to_big_decimal, shared::{ - current_block::{timestamp_of_block_in_seconds, RangeInclusive}, db_order_conversions::{ buy_token_destination_into, order_kind_into, sell_token_source_into, signing_scheme_into, }, - ethrpc::Web3, event_handling::EventStoring, order_quoting::{OrderQuoting, Quote, QuoteSearchParameters}, order_validation::{ diff --git a/crates/autopilot/src/database/onchain_order_events/ethflow_events.rs b/crates/autopilot/src/database/onchain_order_events/ethflow_events.rs index 00f5c1ad15..ae8af8c5f4 100644 --- a/crates/autopilot/src/database/onchain_order_events/ethflow_events.rs +++ b/crates/autopilot/src/database/onchain_order_events/ethflow_events.rs @@ -15,12 +15,12 @@ use { PgTransaction, }, ethcontract::Event as EthContractEvent, - hex_literal::hex, - shared::{ - contracts::settlement_deployment_block_number_hash, + ethrpc::{ current_block::{block_number_to_block_number_hash, BlockNumberHash}, - ethrpc::Web3, + Web3, }, + hex_literal::hex, + shared::contracts::settlement_deployment_block_number_hash, sqlx::types::BigDecimal, std::{collections::HashMap, convert::TryInto}, }; diff --git a/crates/autopilot/src/event_updater.rs b/crates/autopilot/src/event_updater.rs index 90e0ee49e7..7b3e559c8f 100644 --- a/crates/autopilot/src/event_updater.rs +++ b/crates/autopilot/src/event_updater.rs @@ -1,8 +1,8 @@ use { anyhow::Result, contracts::gpv2_settlement, + ethrpc::current_block::{BlockNumberHash, BlockRetrieving}, shared::{ - current_block::{BlockNumberHash, BlockRetrieving}, event_handling::{EventHandler, EventRetrieving, EventStoring}, impl_event_retrieving, maintenance::Maintaining, diff --git a/crates/autopilot/src/lib.rs b/crates/autopilot/src/lib.rs index fd106b4300..3242a325da 100644 --- a/crates/autopilot/src/lib.rs +++ b/crates/autopilot/src/lib.rs @@ -15,6 +15,7 @@ use { }, contracts::{BalancerV2Vault, IUniswapV3Factory, WETH9}, ethcontract::{errors::DeployError, BlockNumber}, + ethrpc::current_block::block_number_to_block_number_hash, futures::StreamExt, model::DomainSeparator, shared::{ @@ -27,7 +28,6 @@ use { trace_call::TraceCallDetector, }, baseline_solver::BaseTokens, - current_block::block_number_to_block_number_hash, fee_subsidy::{config::FeeSubsidyConfiguration, FeeSubsidizing}, gas_price::InstrumentedGasEstimator, http_client::HttpClientFactory, diff --git a/crates/autopilot/src/on_settlement_event_updater.rs b/crates/autopilot/src/on_settlement_event_updater.rs index 2e5aefa771..b062016c89 100644 --- a/crates/autopilot/src/on_settlement_event_updater.rs +++ b/crates/autopilot/src/on_settlement_event_updater.rs @@ -41,13 +41,9 @@ use { anyhow::{anyhow, Context, Result}, contracts::GPv2Settlement, database::byte_array::ByteArray, + ethrpc::{current_block::CurrentBlockStream, Web3}, primitive_types::{H160, H256}, - shared::{ - current_block::CurrentBlockStream, - ethrpc::Web3, - event_handling::MAX_REORG_BLOCK_COUNT, - external_prices::ExternalPrices, - }, + shared::{event_handling::MAX_REORG_BLOCK_COUNT, external_prices::ExternalPrices}, sqlx::PgConnection, std::time::Duration, web3::types::{Transaction, TransactionId}, @@ -297,7 +293,7 @@ mod tests { database::clear_DANGER(&db.0).await.unwrap(); let transport = shared::ethrpc::create_env_test_transport(); let web3 = Web3::new(transport); - let current_block = shared::current_block::current_block_stream( + let current_block = ethrpc::current_block::current_block_stream( Arc::new(web3.clone()), Duration::from_secs(1), ) diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index eeecb9e92e..6a761f149d 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -14,6 +14,7 @@ use { anyhow::{anyhow, ensure, Context, Result}, chrono::Utc, database::order_events::OrderEventLabel, + ethrpc::{current_block::CurrentBlockStream, Web3}, itertools::Itertools, model::{ auction::{Auction, AuctionId}, @@ -29,12 +30,7 @@ use { }, primitive_types::{H160, H256}, rand::seq::SliceRandom, - shared::{ - current_block::CurrentBlockStream, - ethrpc::Web3, - event_handling::MAX_REORG_BLOCK_COUNT, - token_list::AutoUpdatingTokenList, - }, + shared::{event_handling::MAX_REORG_BLOCK_COUNT, token_list::AutoUpdatingTokenList}, std::{ collections::{BTreeMap, HashSet}, sync::Arc, diff --git a/crates/autopilot/src/solvable_orders.rs b/crates/autopilot/src/solvable_orders.rs index c28f918c92..184f4c9013 100644 --- a/crates/autopilot/src/solvable_orders.rs +++ b/crates/autopilot/src/solvable_orders.rs @@ -4,6 +4,7 @@ use { bigdecimal::BigDecimal, chrono::Utc, database::order_events::OrderEventLabel, + ethrpc::current_block::CurrentBlockStream, itertools::Itertools, model::{ auction::Auction, @@ -18,7 +19,6 @@ use { shared::{ account_balances::{BalanceFetching, Query}, bad_token::BadTokenDetecting, - current_block::CurrentBlockStream, price_estimation::native_price_cache::CachingNativePriceEstimator, remaining_amounts, signature_validator::{SignatureCheck, SignatureValidating}, diff --git a/crates/driver/src/boundary/liquidity/balancer/v2/mod.rs b/crates/driver/src/boundary/liquidity/balancer/v2/mod.rs index 4aeb87305d..00828a5ea4 100644 --- a/crates/driver/src/boundary/liquidity/balancer/v2/mod.rs +++ b/crates/driver/src/boundary/liquidity/balancer/v2/mod.rs @@ -17,8 +17,8 @@ use { BalancerV2WeightedPoolFactoryV3, GPv2Settlement, }, + ethrpc::current_block::{BlockRetrieving, CurrentBlockStream}, shared::{ - current_block::{BlockRetrieving, CurrentBlockStream}, http_solver::model::TokenAmount, sources::balancer_v2::{ pool_fetching::BalancerContracts, diff --git a/crates/driver/src/boundary/liquidity/mod.rs b/crates/driver/src/boundary/liquidity/mod.rs index 1a6e1bbe6c..956affc30e 100644 --- a/crates/driver/src/boundary/liquidity/mod.rs +++ b/crates/driver/src/boundary/liquidity/mod.rs @@ -5,12 +5,13 @@ use { infra::{self, blockchain::Ethereum}, }, anyhow::Result, + ethrpc::current_block::CurrentBlockStream, futures::future, itertools::Itertools, model::TokenPair, shared::{ baseline_solver::BaseTokens, - current_block::{self, CurrentBlockStream}, + current_block, http_client::HttpClientFactory, recent_block_cache::{self, CacheConfig}, }, diff --git a/crates/driver/src/boundary/liquidity/swapr.rs b/crates/driver/src/boundary/liquidity/swapr.rs index a0b36a2510..0039eebdef 100644 --- a/crates/driver/src/boundary/liquidity/swapr.rs +++ b/crates/driver/src/boundary/liquidity/swapr.rs @@ -4,9 +4,10 @@ use { domain::liquidity::{self, swapr}, infra::{self, blockchain::Ethereum}, }, - shared::{ - current_block::CurrentBlockStream, - sources::{swapr::reader::SwaprPoolReader, uniswap_v2::pool_fetching::DefaultPoolReader}, + ethrpc::current_block::CurrentBlockStream, + shared::sources::{ + swapr::reader::SwaprPoolReader, + uniswap_v2::pool_fetching::DefaultPoolReader, }, solver::{liquidity::ConstantProductOrder, liquidity_collector::LiquidityCollecting}, }; diff --git a/crates/driver/src/boundary/liquidity/uniswap/v2.rs b/crates/driver/src/boundary/liquidity/uniswap/v2.rs index 2738f2c209..f5671a3564 100644 --- a/crates/driver/src/boundary/liquidity/uniswap/v2.rs +++ b/crates/driver/src/boundary/liquidity/uniswap/v2.rs @@ -9,10 +9,12 @@ use { }, async_trait::async_trait, contracts::{GPv2Settlement, IUniswapLikeRouter}, - ethrpc::Web3, + ethrpc::{ + current_block::{self, CurrentBlockStream}, + Web3, + }, futures::StreamExt, shared::{ - current_block::{self, CurrentBlockStream}, http_solver::model::TokenAmount, maintenance::Maintaining, sources::uniswap_v2::{ diff --git a/crates/driver/src/boundary/liquidity/uniswap/v3.rs b/crates/driver/src/boundary/liquidity/uniswap/v3.rs index f1d09506e5..a6a251ff82 100644 --- a/crates/driver/src/boundary/liquidity/uniswap/v3.rs +++ b/crates/driver/src/boundary/liquidity/uniswap/v3.rs @@ -12,9 +12,9 @@ use { }, anyhow::Context, contracts::{GPv2Settlement, UniswapV3SwapRouter}, + ethrpc::current_block::BlockRetrieving, itertools::Itertools, shared::{ - current_block::BlockRetrieving, http_solver::model::TokenAmount, interaction::Interaction, sources::uniswap_v3::pool_fetching::UniswapV3PoolFetcher, diff --git a/crates/e2e/Cargo.toml b/crates/e2e/Cargo.toml index 19aad00dd5..40ad4a5871 100644 --- a/crates/e2e/Cargo.toml +++ b/crates/e2e/Cargo.toml @@ -15,6 +15,7 @@ contracts = { path = "../contracts" } database = { path = "../database" } driver = { path = "../driver" } ethcontract = { workspace = true } +ethrpc = { path = "../ethrpc" } hex = { workspace = true } hex-literal = { workspace = true } model = { path = "../model" } diff --git a/crates/e2e/tests/e2e/eth_flow.rs b/crates/e2e/tests/e2e/eth_flow.rs index 3164464f74..b19d9539c8 100644 --- a/crates/e2e/tests/e2e/eth_flow.rs +++ b/crates/e2e/tests/e2e/eth_flow.rs @@ -5,6 +5,7 @@ use { contracts::{CoWSwapEthFlow, ERC20Mintable, WETH9}, e2e::{nodes::local_node::TestNodeApi, setup::*, tx, tx_value}, ethcontract::{transaction::TransactionResult, Account, Bytes, H160, H256, U256}, + ethrpc::{current_block::timestamp_of_current_block_in_seconds, Web3}, hex_literal::hex, model::{ order::{ @@ -36,11 +37,7 @@ use { refund_service::{INVALIDATED_OWNER, NO_OWNER}, }, reqwest::Client, - shared::{ - current_block::timestamp_of_current_block_in_seconds, - ethrpc::Web3, - signature_validator::check_erc1271_result, - }, + shared::signature_validator::check_erc1271_result, }; const DAI_PER_ETH: u32 = 1_000; diff --git a/crates/e2e/tests/e2e/refunder.rs b/crates/e2e/tests/e2e/refunder.rs index 4d5077af40..5764ff87a1 100644 --- a/crates/e2e/tests/e2e/refunder.rs +++ b/crates/e2e/tests/e2e/refunder.rs @@ -3,9 +3,9 @@ use { chrono::{TimeZone, Utc}, e2e::{nodes::local_node::TestNodeApi, setup::*}, ethcontract::{H160, U256}, + ethrpc::{current_block::timestamp_of_current_block_in_seconds, Web3}, model::quote::{OrderQuoteRequest, OrderQuoteSide, QuoteSigningScheme, Validity}, refunder::refund_service::RefundService, - shared::{current_block::timestamp_of_current_block_in_seconds, ethrpc::Web3}, sqlx::PgPool, }; diff --git a/crates/ethrpc/Cargo.toml b/crates/ethrpc/Cargo.toml index ef22ad279a..e11a43b1a6 100644 --- a/crates/ethrpc/Cargo.toml +++ b/crates/ethrpc/Cargo.toml @@ -11,6 +11,7 @@ path = "src/lib.rs" doctest = false [dependencies] +anyhow = { workspace = true } async-trait = { workspace= true } ethereum-types = { workspace = true } futures = { workspace = true } @@ -19,6 +20,7 @@ hex-literal = { workspace = true } lazy_static = { workspace = true } mockall = { workspace = true } observe = { path = "../observe" } +primitive-types = { workspace = true } prometheus = { workspace = true } prometheus-metric-storage = { git = "https://github.com/cowprotocol/prometheus-metric-storage", tag = "v0.4.0" } reqwest = { workspace = true } @@ -26,6 +28,7 @@ scopeguard = "1" serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = [] } +tokio-stream = { version = "0.1", features = ["sync"] } web3 = { workspace = true } contracts = { path = "../contracts" } ethcontract = { workspace = true } diff --git a/crates/ethrpc/src/current_block/mod.rs b/crates/ethrpc/src/current_block/mod.rs new file mode 100644 index 0000000000..1be667da12 --- /dev/null +++ b/crates/ethrpc/src/current_block/mod.rs @@ -0,0 +1,321 @@ +pub mod retriever; + +use { + crate::Web3, + anyhow::{anyhow, ensure, Context as _, Result}, + primitive_types::H256, + std::{sync::Arc, time::Duration}, + tokio::sync::watch, + tokio_stream::wrappers::WatchStream, + tracing::Instrument, + web3::{ + helpers, + types::{Block, BlockId, BlockNumber, U64}, + BatchTransport, + Transport, + }, +}; + +pub type BlockNumberHash = (u64, H256); + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RangeInclusive { + start: T, + end: T, +} + +impl RangeInclusive { + pub fn try_new(start: T, end: T) -> Result { + ensure!(end >= start, "end has to be bigger or equal to start"); + Ok(Self { start, end }) + } + + pub fn start(&self) -> &T { + &self.start + } + + pub fn end(&self) -> &T { + &self.end + } + + pub fn into_inner(self) -> (T, T) { + (self.start, self.end) + } +} + +/// Block information. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct BlockInfo { + pub number: u64, + pub hash: H256, + pub parent_hash: H256, +} + +impl TryFrom> for BlockInfo { + type Error = anyhow::Error; + + fn try_from(value: Block) -> std::result::Result { + Ok(Self { + number: value.number.context("block missing number")?.as_u64(), + hash: value.hash.context("block missing hash")?, + parent_hash: value.parent_hash, + }) + } +} + +/// Creates a cloneable stream that yields the current block whenever it +/// changes. +/// +/// The stream is not guaranteed to yield *every* block individually without +/// gaps but it does yield the newest block whenever it detects a block number +/// increase. In practice this means that if the node changes the current block +/// in quick succession we might only observe the last block, skipping some +/// blocks in between. +/// +/// The stream is cloneable so that we only have to poll the node once while +/// being able to share the result with several consumers. Calling this function +/// again would create a new poller so it is preferable to clone an existing +/// stream instead. +pub async fn current_block_stream( + retriever: Arc, + poll_interval: Duration, +) -> Result { + let first_block = retriever.current_block().await?; + tracing::debug!(number=%first_block.number, hash=?first_block.hash, "polled block"); + + let (sender, receiver) = watch::channel(first_block); + let update_future = async move { + let mut previous_block = first_block; + loop { + tokio::time::sleep(poll_interval).await; + let block = match retriever.current_block().await { + Ok(block) => block, + Err(err) => { + tracing::warn!("failed to get current block: {:?}", err); + continue; + } + }; + + // If the block is exactly the same, ignore it. + if previous_block.hash == block.hash { + continue; + } + + // The new block is different but might still have the same number. + + tracing::debug!(number=%block.number, hash=?block.hash, "polled block"); + update_block_metrics(previous_block.number, block.number); + + // Only update the stream if the number has increased. + if block.number <= previous_block.number { + continue; + } + + if sender.send(block).is_err() { + tracing::debug!("exiting polling loop"); + break; + } + + previous_block = block; + } + }; + + tokio::task::spawn(update_future.instrument(tracing::info_span!("current_block_stream"))); + Ok(receiver) +} + +/// A method for creating a block stream with an initial value that never +/// observes any new blocks. This is useful for testing and creating "mock" +/// components. +pub fn mock_single_block(block: BlockInfo) -> CurrentBlockStream { + let (sender, receiver) = watch::channel(block); + // Make sure the `sender` never drops so the `receiver` stays open. + std::mem::forget(sender); + receiver +} + +pub type CurrentBlockStream = watch::Receiver; + +pub fn into_stream(receiver: CurrentBlockStream) -> WatchStream { + WatchStream::new(receiver) +} + +/// Trait for abstracting the retrieval of the block information such as the +/// latest block number. +#[async_trait::async_trait] +pub trait BlockRetrieving: Send + Sync + 'static { + async fn current_block(&self) -> Result; + async fn block(&self, number: u64) -> Result; + async fn blocks(&self, range: RangeInclusive) -> Result>; +} + +#[async_trait::async_trait] +impl BlockRetrieving for Web3 { + async fn current_block(&self) -> Result { + get_block_info_at_id(self, BlockNumber::Latest.into()).await + } + + async fn block(&self, number: u64) -> Result { + let block = get_block_info_at_id(self, U64::from(number).into()).await?; + Ok((block.number, block.hash)) + } + + /// get blocks defined by the range (inclusive) + /// if successful, function guarantees full range of blocks in Result (does + /// not return partial results) + async fn blocks(&self, range: RangeInclusive) -> Result> { + let include_txs = helpers::serialize(&false); + let (start, end) = range.into_inner(); + let mut batch_request = Vec::with_capacity((end - start + 1) as usize); + for i in start..=end { + let num = helpers::serialize(&BlockNumber::Number(i.into())); + let request = self + .transport() + .prepare("eth_getBlockByNumber", vec![num, include_txs.clone()]); + batch_request.push(request); + } + + // send_batch guarantees the size and order of the responses to match the + // requests + self.transport() + .send_batch(batch_request.iter().cloned()) + .await? + .into_iter() + .map(|response| match response { + Ok(response) => { + serde_json::from_value::>(response.clone()) + .with_context(|| format!("unexpected response format: {response:?}")) + .and_then(|response| { + Ok(( + response.number.context("missing block number")?.as_u64(), + response.hash.context("missing hash")?, + )) + }) + } + Err(err) => Err(anyhow!("web3 error: {}", err)), + }) + .collect() + } +} + +async fn get_block_info_at_id(web3: &Web3, id: BlockId) -> Result { + web3.eth() + .block(id) + .await + .with_context(|| format!("failed to get block for {id:?}"))? + .with_context(|| format!("no block for {id:?}"))? + .try_into() +} + +pub async fn timestamp_of_block_in_seconds(web3: &Web3, block_number: BlockNumber) -> Result { + Ok(web3 + .eth() + .block(block_number.into()) + .await + .context("failed to get latest block")? + .context("block should exists")? + .timestamp + .as_u32()) +} + +pub async fn timestamp_of_current_block_in_seconds(web3: &Web3) -> Result { + timestamp_of_block_in_seconds(web3, BlockNumber::Latest).await +} + +pub async fn block_number_to_block_number_hash( + web3: &Web3, + block_number: BlockNumber, +) -> Option { + web3.eth() + .block(BlockId::Number(block_number)) + .await + .ok() + .flatten() + .map(|block| { + ( + block.number.expect("number must exist").as_u64(), + block.hash.expect("hash must exist"), + ) + }) +} + +#[derive(prometheus_metric_storage::MetricStorage)] +pub struct Metrics { + /// How much a new block number differs from the current block number. + #[metric(buckets(0., 1., 2., 4., 8., 25.), labels("sign"))] + block_stream_update_delta: prometheus::HistogramVec, +} + +/// Updates metrics about the difference of the new block number compared to the +/// current block. +fn update_block_metrics(current_block: u64, new_block: u64) { + let metric = &Metrics::instance(observe::metrics::get_storage_registry()) + .unwrap() + .block_stream_update_delta; + + let delta = (i128::from(new_block) - i128::from(current_block)) as f64; + if delta <= 0. { + metric.with_label_values(&["negative"]).observe(delta.abs()); + } else { + metric.with_label_values(&["positive"]).observe(delta.abs()); + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::{create_env_test_transport, create_test_transport}, + futures::StreamExt, + }; + + #[tokio::test] + #[ignore] + async fn mainnet() { + observe::tracing::initialize_reentrant("shared=debug"); + let node = std::env::var("NODE_URL").unwrap(); + let transport = create_test_transport(&node); + let web3 = Web3::new(transport); + let receiver = current_block_stream(Arc::new(web3), Duration::from_secs(1)) + .await + .unwrap(); + let mut stream = into_stream(receiver); + for _ in 0..3 { + let block = stream.next().await.unwrap(); + println!("new block number {}", block.number); + } + } + + #[tokio::test] + #[ignore] + async fn current_blocks_test() { + let transport = create_env_test_transport(); + let web3 = Web3::new(transport); + + // single block + let range = RangeInclusive::try_new(5, 5).unwrap(); + let blocks = web3.blocks(range).await.unwrap(); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks.last().unwrap().0, 5); + + // multiple blocks + let range = RangeInclusive::try_new(5, 8).unwrap(); + let blocks = web3.blocks(range).await.unwrap(); + assert_eq!(blocks.len(), 4); + assert_eq!(blocks.last().unwrap().0, 8); + assert_eq!(blocks.first().unwrap().0, 5); + + // shortened blocks + let current_block_number = 5u64; + let length = 25u64; + let range = RangeInclusive::try_new( + current_block_number.saturating_sub(length), + current_block_number, + ) + .unwrap(); + let blocks = web3.blocks(range).await.unwrap(); + assert_eq!(blocks.len(), 6); + assert_eq!(blocks.last().unwrap().0, 5); + assert_eq!(blocks.first().unwrap().0, 0); + } +} diff --git a/crates/ethrpc/src/current_block/retriever.rs b/crates/ethrpc/src/current_block/retriever.rs new file mode 100644 index 0000000000..d228331982 --- /dev/null +++ b/crates/ethrpc/src/current_block/retriever.rs @@ -0,0 +1,130 @@ +use { + super::{BlockInfo, BlockNumberHash, BlockRetrieving, RangeInclusive}, + crate::Web3, + anyhow::{bail, Context, Result}, + contracts::support::FetchBlock, + primitive_types::{H256, U256}, + web3::{ + transports::Batch, + types::{BlockNumber, CallRequest}, + }, +}; + +/// A hybrid `eth_getBlock` and `eth_call` based block fetcher. +/// +/// This is similar to the `eth_call` based fetcher, in that it can be used +/// for nodes where `eth_getBlockBy*` and `eth_blockNumber` calls return the +/// latest block for which a header is available even if the state isn't. +/// +/// However, some nodes (notably Nethermind) do **not** support `eth_call` on +/// the pending block, which is required for the `eth_call` based fetcher to +/// work. As a work-around, we issue simultaneous `eth_call` and `eth_getBlock` +/// requests to fetch the full block header (which includes the hash) and +/// simulate code on the latest block for which there is state. This gives us +/// the best of both worlds at the cost of an extra request per "poll". +pub struct BlockRetriever(pub Web3); + +#[async_trait::async_trait] +impl BlockRetrieving for BlockRetriever { + async fn current_block(&self) -> Result { + let (return_data, block) = { + let batch = web3::Web3::new(Batch::new(self.0.transport().clone())); + + let bytecode = ::raw_contract().bytecode.to_bytes().unwrap(); + let return_data = batch.eth().call( + CallRequest { + data: Some(bytecode), + ..Default::default() + }, + Some(BlockNumber::Latest.into()), + ); + let block = batch.eth().block(BlockNumber::Latest.into()); + + batch.transport().submit_batch().await?; + + ( + return_data.await?.0, + block.await?.context("missing latest block")?, + ) + }; + + let call = decode( + return_data + .as_slice() + .try_into() + .context("unexpected block fetch return length")?, + )?; + let fetch = BlockInfo::try_from(block)?; + + // The `FetchBlock` contract works by returning `block.number - 1`, its + // hash, and its parent's hash. This means that, if we call it with + // `latest`, then `call.number` will be the block `latest - 1`. + // + // We accept a few cases here: + // 1. If `call.number + 1 >= fetch.number`, this means that the state for the + // `fetch.number` block is available (as the `eth_call` executed on that + // block or later). Hence, `Ok(fetch)` is the current block. + // 2. If `call.number + 1 == fetch.number - 1`, then there is a 2 block + // differential between `call` and `fetch`, meaning that the `fetch.number` + // block header is available but not its state, so return the `fetch.number - + // 1` as the current block. + // 3. Unexpectedly large differential between `call` and `fetch`. + if call.number.saturating_add(1) >= fetch.number { + Ok(fetch) + } else if call.number.saturating_add(1) == fetch.number.saturating_sub(1) { + Ok(BlockInfo { + number: fetch.number.saturating_sub(1), + hash: fetch.parent_hash, + parent_hash: call.hash, + }) + } else { + bail!("large differential between eth_getBlock {fetch:?} and eth_call {call:?}"); + } + } + + async fn block(&self, number: u64) -> Result { + self.0.block(number).await + } + + async fn blocks(&self, range: RangeInclusive) -> Result> { + self.0.blocks(range).await + } +} + +/// Decodes the return data from the `FetchBlock` contract. +fn decode(return_data: [u8; 96]) -> Result { + let number = u64::try_from(U256::from_big_endian(&return_data[0..32])) + .ok() + .context("block number overflows u64")?; + let hash = H256::from_slice(&return_data[32..64]); + let parent_hash = H256::from_slice(&return_data[64..96]); + + Ok(BlockInfo { + number, + hash, + parent_hash, + }) +} + +#[cfg(test)] +mod tests { + use {super::*, crate::create_env_test_transport}; + + #[ignore] + #[tokio::test] + async fn node() { + let retriever = BlockRetriever(Web3::new(create_env_test_transport())); + + let mut block = Option::::None; + for _ in 0..3 { + loop { + let current = retriever.current_block().await.unwrap(); + if block.is_none() || matches!(block, Some(b) if b < current.number) { + println!("current block: {current:#?}"); + block = Some(current.number); + break; + } + } + } + } +} diff --git a/crates/ethrpc/src/lib.rs b/crates/ethrpc/src/lib.rs index 4d5fddb9af..ddb056b645 100644 --- a/crates/ethrpc/src/lib.rs +++ b/crates/ethrpc/src/lib.rs @@ -1,4 +1,5 @@ pub mod buffered; +pub mod current_block; pub mod dummy; pub mod extensions; pub mod http; diff --git a/crates/refunder/Cargo.toml b/crates/refunder/Cargo.toml index 3234e548f5..61d0d5fa0c 100644 --- a/crates/refunder/Cargo.toml +++ b/crates/refunder/Cargo.toml @@ -13,6 +13,7 @@ clap = { workspace = true } contracts = { path = "../contracts" } database = { path = "../database" } ethcontract = { workspace = true } +ethrpc = { path = "../ethrpc" } futures = {workspace = true} gas-estimation = { workspace = true } lazy_static = { workspace = true } diff --git a/crates/refunder/src/refund_service.rs b/crates/refunder/src/refund_service.rs index c7d65393ae..fbe5aa1ba0 100644 --- a/crates/refunder/src/refund_service.rs +++ b/crates/refunder/src/refund_service.rs @@ -9,11 +9,13 @@ use { OrderUid, }, ethcontract::{Account, H160, H256}, - futures::{stream, StreamExt}, - shared::{ + ethrpc::{ current_block::timestamp_of_current_block_in_seconds, - ethrpc::{Web3, Web3CallBatch, MAX_BATCH_SIZE}, + Web3, + Web3CallBatch, + MAX_BATCH_SIZE, }, + futures::{stream, StreamExt}, sqlx::PgPool, }; diff --git a/crates/shared/src/account_balances/arguments.rs b/crates/shared/src/account_balances/arguments.rs index d5652c84a9..3ee3a5bbc3 100644 --- a/crates/shared/src/account_balances/arguments.rs +++ b/crates/shared/src/account_balances/arguments.rs @@ -3,11 +3,10 @@ use { crate::{ arguments::{display_option, CodeSimulatorKind}, code_simulation::{CodeSimulating, TenderlyCodeSimulator, Web3ThenTenderly}, - current_block::CurrentBlockStream, - ethrpc::Web3, tenderly_api::TenderlyApi, }, ethcontract::H160, + ethrpc::{current_block::CurrentBlockStream, Web3}, std::{ fmt::{self, Display, Formatter}, sync::Arc, diff --git a/crates/shared/src/account_balances/cached.rs b/crates/shared/src/account_balances/cached.rs index 088b515a94..4bf889d016 100644 --- a/crates/shared/src/account_balances/cached.rs +++ b/crates/shared/src/account_balances/cached.rs @@ -1,9 +1,7 @@ use { - crate::{ - account_balances::{BalanceFetching, Query, TransferSimulationError}, - current_block::{into_stream, CurrentBlockStream}, - }, + crate::account_balances::{BalanceFetching, Query, TransferSimulationError}, anyhow::Result, + ethrpc::current_block::{into_stream, CurrentBlockStream}, futures::StreamExt, itertools::Itertools, primitive_types::U256, @@ -202,8 +200,9 @@ impl BalanceFetching for CachingBalanceFetcher { mod tests { use { super::*, - crate::{account_balances::MockBalanceFetching, current_block::BlockInfo}, + crate::account_balances::MockBalanceFetching, ethcontract::H160, + ethrpc::current_block::BlockInfo, model::order::SellTokenSource, }; diff --git a/crates/shared/src/arguments.rs b/crates/shared/src/arguments.rs index 9020a5bc58..7655bb4760 100644 --- a/crates/shared/src/arguments.rs +++ b/crates/shared/src/arguments.rs @@ -4,8 +4,6 @@ use { crate::{ account_balances, - current_block, - ethrpc, gas_price_estimation::GasEstimatorType, price_estimation::PriceEstimators, rate_limiter::RateLimitingStrategy, @@ -140,10 +138,10 @@ logging_args_with_default_filter!( #[group(skip)] pub struct Arguments { #[clap(flatten)] - pub ethrpc: ethrpc::Arguments, + pub ethrpc: crate::ethrpc::Arguments, #[clap(flatten)] - pub current_block: current_block::Arguments, + pub current_block: crate::current_block::Arguments, #[clap(flatten)] pub tenderly: tenderly_api::Arguments, diff --git a/crates/shared/src/contracts.rs b/crates/shared/src/contracts.rs index f6d6f20e94..d1d0f0b4c8 100644 --- a/crates/shared/src/contracts.rs +++ b/crates/shared/src/contracts.rs @@ -1,14 +1,14 @@ use { - crate::{ - current_block::{block_number_to_block_number_hash, BlockNumberHash}, - ethrpc::Web3, - }, anyhow::{anyhow, bail, Result}, contracts::GPv2Settlement, ethcontract::{ common::{contract::Network, DeploymentInformation}, Contract, }, + ethrpc::{ + current_block::{block_number_to_block_number_hash, BlockNumberHash}, + Web3, + }, web3::types::U64, }; diff --git a/crates/shared/src/current_block.rs b/crates/shared/src/current_block.rs index 0689bfa468..d0d09f39f6 100644 --- a/crates/shared/src/current_block.rs +++ b/crates/shared/src/current_block.rs @@ -1,325 +1,57 @@ -mod arguments; -mod retriever; +//! Global block stream arguments. use { - crate::ethrpc::Web3, - anyhow::{anyhow, ensure, Context as _, Result}, - primitive_types::H256, - std::{sync::Arc, time::Duration}, - tokio::sync::watch, - tokio_stream::wrappers::WatchStream, - tracing::Instrument, - web3::{ - helpers, - types::{Block, BlockId, BlockNumber, U64}, - BatchTransport, - Transport, + crate::arguments::duration_from_seconds, + anyhow::Result, + clap::Parser, + ethrpc::{ + current_block::{current_block_stream, retriever, BlockRetrieving, CurrentBlockStream}, + Web3, + }, + std::{ + fmt::{self, Display, Formatter}, + sync::Arc, + time::Duration, }, }; -pub use self::arguments::Arguments; - -pub type BlockNumberHash = (u64, H256); - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct RangeInclusive { - start: T, - end: T, -} - -impl RangeInclusive { - pub fn try_new(start: T, end: T) -> Result { - ensure!(end >= start, "end has to be bigger or equal to start"); - Ok(Self { start, end }) - } - - pub fn start(&self) -> &T { - &self.start - } - - pub fn end(&self) -> &T { - &self.end - } - - pub fn into_inner(self) -> (T, T) { - (self.start, self.end) - } -} - -/// Block information. -#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] -pub struct BlockInfo { - pub number: u64, - pub hash: H256, - pub parent_hash: H256, -} - -impl TryFrom> for BlockInfo { - type Error = anyhow::Error; - - fn try_from(value: Block) -> std::result::Result { - Ok(Self { - number: value.number.context("block missing number")?.as_u64(), - hash: value.hash.context("block missing hash")?, - parent_hash: value.parent_hash, - }) - } -} - -/// Creates a cloneable stream that yields the current block whenever it -/// changes. -/// -/// The stream is not guaranteed to yield *every* block individually without -/// gaps but it does yield the newest block whenever it detects a block number -/// increase. In practice this means that if the node changes the current block -/// in quick succession we might only observe the last block, skipping some -/// blocks in between. -/// -/// The stream is cloneable so that we only have to poll the node once while -/// being able to share the result with several consumers. Calling this function -/// again would create a new poller so it is preferable to clone an existing -/// stream instead. -pub async fn current_block_stream( - retriever: Arc, - poll_interval: Duration, -) -> Result { - let first_block = retriever.current_block().await?; - tracing::debug!(number=%first_block.number, hash=?first_block.hash, "polled block"); - - let (sender, receiver) = watch::channel(first_block); - let update_future = async move { - let mut previous_block = first_block; - loop { - tokio::time::sleep(poll_interval).await; - let block = match retriever.current_block().await { - Ok(block) => block, - Err(err) => { - tracing::warn!("failed to get current block: {:?}", err); - continue; - } - }; - - // If the block is exactly the same, ignore it. - if previous_block.hash == block.hash { - continue; - } - - // The new block is different but might still have the same number. - - tracing::debug!(number=%block.number, hash=?block.hash, "polled block"); - update_block_metrics(previous_block.number, block.number); - - // Only update the stream if the number has increased. - if block.number <= previous_block.number { - continue; - } - - if sender.send(block).is_err() { - tracing::debug!("exiting polling loop"); - break; - } - - previous_block = block; - } - }; - - tokio::task::spawn(update_future.instrument(tracing::info_span!("current_block_stream"))); - Ok(receiver) -} - -/// A method for creating a block stream with an initial value that never -/// observes any new blocks. This is useful for testing and creating "mock" -/// components. -pub fn mock_single_block(block: BlockInfo) -> CurrentBlockStream { - let (sender, receiver) = watch::channel(block); - // Make sure the `sender` never drops so the `receiver` stays open. - std::mem::forget(sender); - receiver -} - -pub type CurrentBlockStream = watch::Receiver; - -pub fn into_stream(receiver: CurrentBlockStream) -> WatchStream { - WatchStream::new(receiver) -} - -/// Trait for abstracting the retrieval of the block information such as the -/// latest block number. -#[async_trait::async_trait] -pub trait BlockRetrieving: Send + Sync + 'static { - async fn current_block(&self) -> Result; - async fn block(&self, number: u64) -> Result; - async fn blocks(&self, range: RangeInclusive) -> Result>; -} - -#[async_trait::async_trait] -impl BlockRetrieving for Web3 { - async fn current_block(&self) -> Result { - get_block_info_at_id(self, BlockNumber::Latest.into()).await - } - - async fn block(&self, number: u64) -> Result { - let block = get_block_info_at_id(self, U64::from(number).into()).await?; - Ok((block.number, block.hash)) - } - - /// get blocks defined by the range (inclusive) - /// if successful, function guarantees full range of blocks in Result (does - /// not return partial results) - async fn blocks(&self, range: RangeInclusive) -> Result> { - let include_txs = helpers::serialize(&false); - let (start, end) = range.into_inner(); - let mut batch_request = Vec::with_capacity((end - start + 1) as usize); - for i in start..=end { - let num = helpers::serialize(&BlockNumber::Number(i.into())); - let request = self - .transport() - .prepare("eth_getBlockByNumber", vec![num, include_txs.clone()]); - batch_request.push(request); - } - - // send_batch guarantees the size and order of the responses to match the - // requests - self.transport() - .send_batch(batch_request.iter().cloned()) - .await? - .into_iter() - .map(|response| match response { - Ok(response) => { - serde_json::from_value::>(response.clone()) - .with_context(|| format!("unexpected response format: {response:?}")) - .and_then(|response| { - Ok(( - response.number.context("missing block number")?.as_u64(), - response.hash.context("missing hash")?, - )) - }) - } - Err(err) => Err(anyhow!("web3 error: {}", err)), - }) - .collect() - } -} - -async fn get_block_info_at_id(web3: &Web3, id: BlockId) -> Result { - web3.eth() - .block(id) - .await - .with_context(|| format!("failed to get block for {id:?}"))? - .with_context(|| format!("no block for {id:?}"))? - .try_into() -} - -pub async fn timestamp_of_block_in_seconds(web3: &Web3, block_number: BlockNumber) -> Result { - Ok(web3 - .eth() - .block(block_number.into()) - .await - .context("failed to get latest block")? - .context("block should exists")? - .timestamp - .as_u32()) -} - -pub async fn timestamp_of_current_block_in_seconds(web3: &Web3) -> Result { - timestamp_of_block_in_seconds(web3, BlockNumber::Latest).await -} - -pub async fn block_number_to_block_number_hash( - web3: &Web3, - block_number: BlockNumber, -) -> Option { - web3.eth() - .block(BlockId::Number(block_number)) +/// Command line arguments for creating global block stream. +#[derive(Debug, Parser)] +#[group(skip)] +pub struct Arguments { + /// How often in seconds we poll the node to check if the current block has + /// changed. + #[clap( + long, + env, + default_value = "5", + value_parser = duration_from_seconds, + )] + pub block_stream_poll_interval_seconds: Duration, +} + +impl Arguments { + pub fn retriever(&self, web3: Web3) -> Arc { + Arc::new(retriever::BlockRetriever(web3)) + } + + pub async fn stream(&self, web3: Web3) -> Result { + current_block_stream( + self.retriever(web3), + self.block_stream_poll_interval_seconds, + ) .await - .ok() - .flatten() - .map(|block| { - ( - block.number.expect("number must exist").as_u64(), - block.hash.expect("hash must exist"), - ) - }) -} - -#[derive(prometheus_metric_storage::MetricStorage)] -pub struct Metrics { - /// How much a new block number differs from the current block number. - #[metric(buckets(0., 1., 2., 4., 8., 25.), labels("sign"))] - block_stream_update_delta: prometheus::HistogramVec, -} - -/// Updates metrics about the difference of the new block number compared to the -/// current block. -fn update_block_metrics(current_block: u64, new_block: u64) { - let metric = &Metrics::instance(observe::metrics::get_storage_registry()) - .unwrap() - .block_stream_update_delta; - - let delta = (i128::from(new_block) - i128::from(current_block)) as f64; - if delta <= 0. { - metric.with_label_values(&["negative"]).observe(delta.abs()); - } else { - metric.with_label_values(&["positive"]).observe(delta.abs()); } } -#[cfg(test)] -mod tests { - use { - super::*, - crate::ethrpc::{create_env_test_transport, create_test_transport}, - futures::StreamExt, - num::Saturating, - }; - - #[tokio::test] - #[ignore] - async fn mainnet() { - observe::tracing::initialize_reentrant("shared=debug"); - let node = std::env::var("NODE_URL").unwrap(); - let transport = create_test_transport(&node); - let web3 = Web3::new(transport); - let receiver = current_block_stream(Arc::new(web3), Duration::from_secs(1)) - .await - .unwrap(); - let mut stream = into_stream(receiver); - for _ in 0..3 { - let block = stream.next().await.unwrap(); - println!("new block number {}", block.number); - } - } - - #[tokio::test] - #[ignore] - async fn current_blocks_test() { - let transport = create_env_test_transport(); - let web3 = Web3::new(transport); - - // single block - let range = RangeInclusive::try_new(5, 5).unwrap(); - let blocks = web3.blocks(range).await.unwrap(); - assert_eq!(blocks.len(), 1); - assert_eq!(blocks.last().unwrap().0, 5); +impl Display for Arguments { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + writeln!( + f, + "block_stream_poll_interval_seconds: {:?}", + self.block_stream_poll_interval_seconds + )?; - // multiple blocks - let range = RangeInclusive::try_new(5, 8).unwrap(); - let blocks = web3.blocks(range).await.unwrap(); - assert_eq!(blocks.len(), 4); - assert_eq!(blocks.last().unwrap().0, 8); - assert_eq!(blocks.first().unwrap().0, 5); - - // shortened blocks - let current_block_number = 5; - let length = 25; - let range = RangeInclusive::try_new( - current_block_number.saturating_sub(length), - current_block_number, - ) - .unwrap(); - let blocks = web3.blocks(range).await.unwrap(); - assert_eq!(blocks.len(), 6); - assert_eq!(blocks.last().unwrap().0, 5); - assert_eq!(blocks.first().unwrap().0, 0); + Ok(()) } } diff --git a/crates/shared/src/current_block/arguments.rs b/crates/shared/src/current_block/arguments.rs index 4f2df01c43..e69de29bb2 100644 --- a/crates/shared/src/current_block/arguments.rs +++ b/crates/shared/src/current_block/arguments.rs @@ -1,55 +0,0 @@ -//! Global block stream arguments. - -use { - super::{current_block_stream, retriever, BlockRetrieving, CurrentBlockStream}, - crate::arguments::duration_from_seconds, - anyhow::Result, - clap::Parser, - ethrpc::Web3, - std::{ - fmt::{self, Display, Formatter}, - sync::Arc, - time::Duration, - }, -}; - -/// Command line arguments for creating global block stream. -#[derive(Debug, Parser)] -#[group(skip)] -pub struct Arguments { - /// How often in seconds we poll the node to check if the current block has - /// changed. - #[clap( - long, - env, - default_value = "5", - value_parser = duration_from_seconds, - )] - pub block_stream_poll_interval_seconds: Duration, -} - -impl Arguments { - pub fn retriever(&self, web3: Web3) -> Arc { - Arc::new(retriever::BlockRetriever(web3)) - } - - pub async fn stream(&self, web3: Web3) -> Result { - current_block_stream( - self.retriever(web3), - self.block_stream_poll_interval_seconds, - ) - .await - } -} - -impl Display for Arguments { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - writeln!( - f, - "block_stream_poll_interval_seconds: {:?}", - self.block_stream_poll_interval_seconds - )?; - - Ok(()) - } -} diff --git a/crates/shared/src/event_handling.rs b/crates/shared/src/event_handling.rs index 67df101de9..232437ca84 100644 --- a/crates/shared/src/event_handling.rs +++ b/crates/shared/src/event_handling.rs @@ -1,8 +1,5 @@ use { - crate::{ - current_block::{BlockNumberHash, BlockRetrieving, RangeInclusive}, - maintenance::Maintaining, - }, + crate::maintenance::Maintaining, anyhow::{Context, Error, Result}, ethcontract::{ contract::{AllEventsBuilder, ParseLog}, @@ -11,6 +8,7 @@ use { Event as EthcontractEvent, EventMetadata, }, + ethrpc::current_block::{BlockNumberHash, BlockRetrieving, RangeInclusive}, futures::{future, Stream, StreamExt, TryStreamExt}, std::sync::Arc, tokio::sync::Mutex, @@ -574,12 +572,13 @@ fn track_block_range(range: &str) { mod tests { use { super::*, - crate::{ - current_block::block_number_to_block_number_hash, - ethrpc::{create_env_test_transport, Web3}, - }, contracts::{gpv2_settlement, GPv2Settlement}, ethcontract::{BlockNumber, H256}, + ethrpc::{ + create_env_test_transport, + current_block::block_number_to_block_number_hash, + Web3, + }, std::str::FromStr, }; diff --git a/crates/shared/src/maintenance.rs b/crates/shared/src/maintenance.rs index db118ff096..1af2210a15 100644 --- a/crates/shared/src/maintenance.rs +++ b/crates/shared/src/maintenance.rs @@ -1,6 +1,6 @@ use { - crate::current_block::{self, BlockInfo, CurrentBlockStream}, anyhow::{ensure, Result}, + ethrpc::current_block::{self, BlockInfo, CurrentBlockStream}, futures::{future::join_all, Stream, StreamExt as _}, std::{sync::Arc, time::Duration}, tokio::time, diff --git a/crates/shared/src/price_estimation/http.rs b/crates/shared/src/price_estimation/http.rs index 640c7e5fcd..61dd0890b8 100644 --- a/crates/shared/src/price_estimation/http.rs +++ b/crates/shared/src/price_estimation/http.rs @@ -416,7 +416,6 @@ mod tests { use { super::*, crate::{ - current_block::current_block_stream, gas_price_estimation::FakeGasPriceEstimator, http_solver::{ model::{ExecutedAmmModel, ExecutedOrderModel, InteractionData, UpdatedAmmModel}, @@ -445,7 +444,7 @@ mod tests { anyhow::anyhow, clap::ValueEnum, ethcontract::dyns::DynTransport, - ethrpc::{http::HttpTransport, Web3}, + ethrpc::{current_block::current_block_stream, http::HttpTransport, Web3}, gas_estimation::GasPrice1559, maplit::hashmap, model::order::OrderKind, diff --git a/crates/shared/src/recent_block_cache.rs b/crates/shared/src/recent_block_cache.rs index 78bc681ac5..8090c0d6b7 100644 --- a/crates/shared/src/recent_block_cache.rs +++ b/crates/shared/src/recent_block_cache.rs @@ -25,10 +25,10 @@ //! could simplify this module if it was only used by by the former. use { - crate::current_block::CurrentBlockStream, anyhow::Result, cached::{Cached, SizedCache}, ethcontract::BlockNumber, + ethrpc::current_block::CurrentBlockStream, prometheus::IntCounterVec, std::{ cmp, @@ -359,7 +359,7 @@ where mod tests { use { super::*, - crate::current_block::{self, BlockInfo}, + ethrpc::current_block::{mock_single_block, BlockInfo}, futures::FutureExt, std::sync::Arc, }; @@ -410,7 +410,7 @@ mod tests { fn marks_recently_used() { let fetcher = FakeCacheFetcher::default(); let block_number = 10u64; - let block_stream = current_block::mock_single_block(BlockInfo { + let block_stream = mock_single_block(BlockInfo { number: block_number, ..Default::default() }); @@ -463,7 +463,7 @@ mod tests { let fetcher = FakeCacheFetcher::default(); let values = fetcher.0.clone(); let block_number = 10u64; - let block_stream = current_block::mock_single_block(BlockInfo { + let block_stream = mock_single_block(BlockInfo { number: block_number, ..Default::default() }); @@ -510,7 +510,7 @@ mod tests { let fetcher = FakeCacheFetcher::default(); let values = fetcher.0.clone(); let block_number = 10u64; - let block_stream = current_block::mock_single_block(BlockInfo { + let block_stream = mock_single_block(BlockInfo { number: block_number, ..Default::default() }); @@ -566,7 +566,7 @@ mod tests { let fetcher = FakeCacheFetcher::default(); let values = fetcher.0.clone(); let block_number = 10u64; - let block_stream = current_block::mock_single_block(BlockInfo { + let block_stream = mock_single_block(BlockInfo { number: block_number, ..Default::default() }); @@ -632,7 +632,7 @@ mod tests { fn evicts_old_blocks_from_cache() { let fetcher = FakeCacheFetcher::default(); let block_number = 10u64; - let block_stream = current_block::mock_single_block(BlockInfo { + let block_stream = mock_single_block(BlockInfo { number: block_number, ..Default::default() }); @@ -672,7 +672,7 @@ mod tests { fn respects_max_age_limit_for_recent() { let fetcher = FakeCacheFetcher::default(); let block_number = 10u64; - let block_stream = current_block::mock_single_block(BlockInfo { + let block_stream = mock_single_block(BlockInfo { number: block_number, ..Default::default() }); diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching.rs b/crates/shared/src/sources/balancer_v2/pool_fetching.rs index ca88d69712..c9b505252f 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching.rs @@ -26,7 +26,6 @@ use { swap::fixed_point::Bfp, }, crate::{ - current_block::{BlockRetrieving, CurrentBlockStream}, ethrpc::{Web3, Web3Transport}, maintenance::Maintaining, recent_block_cache::{Block, CacheConfig}, @@ -49,6 +48,7 @@ use { BalancerV2WeightedPoolFactoryV4, }, ethcontract::{dyns::DynInstance, BlockId, Instance, H160, H256}, + ethrpc::current_block::{BlockRetrieving, CurrentBlockStream}, model::TokenPair, reqwest::Client, std::{ @@ -500,7 +500,6 @@ mod tests { use { super::*, crate::{ - ethrpc, sources::balancer_v2::{ graph_api::{BalancerSubgraphClient, PoolData, PoolType}, pool_init::EmptyPoolInitializer, @@ -535,7 +534,7 @@ mod tests { Arc::new(CachedTokenInfoFetcher::new(Box::new(TokenInfoFetcher { web3: web3.clone(), }))); - let block_stream = crate::current_block::current_block_stream( + let block_stream = ethrpc::current_block::current_block_stream( Arc::new(web3.clone()), Duration::from_secs(1000), ) diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching/cache.rs b/crates/shared/src/sources/balancer_v2/pool_fetching/cache.rs index 906075180c..112cbe5bb4 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching/cache.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching/cache.rs @@ -6,13 +6,13 @@ use { super::internal::InternalPoolFetching, crate::{ - current_block::CurrentBlockStream, maintenance::Maintaining, recent_block_cache::{Block, CacheConfig, CacheFetching, CacheKey, RecentBlockCache}, sources::balancer_v2::pools::Pool, }, anyhow::Result, ethcontract::H256, + ethrpc::current_block::CurrentBlockStream, std::{collections::HashSet, sync::Arc}, }; diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching/pool_storage.rs b/crates/shared/src/sources/balancer_v2/pool_fetching/pool_storage.rs index 75a7a5ffe1..d047858cda 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching/pool_storage.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching/pool_storage.rs @@ -19,7 +19,6 @@ use { crate::{ - current_block::RangeInclusive, event_handling::EventStoring, sources::balancer_v2::pools::{common, FactoryIndexing, PoolIndexing}, }, @@ -29,6 +28,7 @@ use { Event as BasePoolFactoryEvent, }, ethcontract::{Event, H160, H256}, + ethrpc::current_block::RangeInclusive, model::TokenPair, std::{ cmp, diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs b/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs index 579c15abd9..2e5e01774a 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs @@ -4,9 +4,7 @@ use { super::{internal::InternalPoolFetching, pool_storage::PoolStorage}, crate::{ - current_block::{BlockNumberHash, BlockRetrieving}, ethcontract_error::EthcontractErrorType, - ethrpc::{Web3, Web3CallBatch, Web3Transport, MAX_BATCH_SIZE}, event_handling::{EventHandler, EventRetrieving}, maintenance::Maintaining, recent_block_cache::Block, @@ -20,6 +18,13 @@ use { anyhow::Result, contracts::{balancer_v2_base_pool_factory, BalancerV2BasePoolFactory}, ethcontract::{dyns::DynAllEventsBuilder, errors::MethodError, BlockId, Instance, H256}, + ethrpc::{ + current_block::{BlockNumberHash, BlockRetrieving}, + Web3, + Web3CallBatch, + Web3Transport, + MAX_BATCH_SIZE, + }, futures::future, hex_literal::hex, model::TokenPair, diff --git a/crates/shared/src/sources/uniswap_v2/pool_cache.rs b/crates/shared/src/sources/uniswap_v2/pool_cache.rs index 3515d22d0a..78891375ab 100644 --- a/crates/shared/src/sources/uniswap_v2/pool_cache.rs +++ b/crates/shared/src/sources/uniswap_v2/pool_cache.rs @@ -1,11 +1,11 @@ use { crate::{ - current_block::CurrentBlockStream, maintenance::Maintaining, recent_block_cache::{Block, CacheConfig, CacheFetching, CacheKey, RecentBlockCache}, sources::uniswap_v2::pool_fetching::{Pool, PoolFetching}, }, anyhow::Result, + ethrpc::current_block::CurrentBlockStream, model::TokenPair, std::{collections::HashSet, sync::Arc}, }; diff --git a/crates/shared/src/sources/uniswap_v3/event_fetching.rs b/crates/shared/src/sources/uniswap_v3/event_fetching.rs index 3024e9d0e6..a96511e7f9 100644 --- a/crates/shared/src/sources/uniswap_v3/event_fetching.rs +++ b/crates/shared/src/sources/uniswap_v3/event_fetching.rs @@ -1,9 +1,5 @@ use { - crate::{ - current_block::RangeInclusive, - ethrpc::Web3, - event_handling::{EventRetrieving, EventStoring}, - }, + crate::event_handling::{EventRetrieving, EventStoring}, anyhow::{Context, Result}, contracts::{ uniswap_v3_pool::event_data::{Burn, Mint, Swap}, @@ -19,6 +15,7 @@ use { H160, H256, }, + ethrpc::{current_block::RangeInclusive, Web3}, hex_literal::hex, std::collections::BTreeMap, }; diff --git a/crates/shared/src/sources/uniswap_v3/pool_fetching.rs b/crates/shared/src/sources/uniswap_v3/pool_fetching.rs index e0b1d18dd6..04f7cfe2e4 100644 --- a/crates/shared/src/sources/uniswap_v3/pool_fetching.rs +++ b/crates/shared/src/sources/uniswap_v3/pool_fetching.rs @@ -4,14 +4,16 @@ use { graph_api::{PoolData, Token, UniV3SubgraphClient}, }, crate::{ - current_block::{BlockRetrieving, RangeInclusive}, - ethrpc::Web3, event_handling::{EventHandler, EventStoring, MAX_REORG_BLOCK_COUNT}, maintenance::Maintaining, recent_block_cache::Block, }, anyhow::{Context, Result}, ethcontract::{Event, H160, U256}, + ethrpc::{ + current_block::{BlockRetrieving, RangeInclusive}, + Web3, + }, itertools::{Either, Itertools}, model::{u256_decimal, TokenPair}, num::{rational::Ratio, BigInt, Zero}, diff --git a/crates/solver/src/driver.rs b/crates/solver/src/driver.rs index a3f3b974c7..d6f6fcd82f 100644 --- a/crates/solver/src/driver.rs +++ b/crates/solver/src/driver.rs @@ -17,6 +17,7 @@ use { anyhow::{anyhow, Context, Result}, contracts::GPv2Settlement, ethcontract::Account, + ethrpc::{current_block::CurrentBlockStream, Web3}, futures::future::join_all, gas_estimation::GasPriceEstimating, model::{ @@ -36,8 +37,6 @@ use { primitive_types::{H160, U256}, shared::{ account_balances::BalanceFetching, - current_block::CurrentBlockStream, - ethrpc::Web3, external_prices::ExternalPrices, http_solver::model::{ AuctionResult, From cce8e44e9e7076c4f7b9f7eff6c28502b9222144 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Tue, 29 Aug 2023 08:56:19 +0200 Subject: [PATCH 2/2] Delete old files --- crates/shared/src/current_block/arguments.rs | 0 crates/shared/src/current_block/retriever.rs | 129 ------------------- 2 files changed, 129 deletions(-) delete mode 100644 crates/shared/src/current_block/arguments.rs delete mode 100644 crates/shared/src/current_block/retriever.rs diff --git a/crates/shared/src/current_block/arguments.rs b/crates/shared/src/current_block/arguments.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/crates/shared/src/current_block/retriever.rs b/crates/shared/src/current_block/retriever.rs deleted file mode 100644 index 3a30a34c5a..0000000000 --- a/crates/shared/src/current_block/retriever.rs +++ /dev/null @@ -1,129 +0,0 @@ -use { - super::{BlockInfo, BlockNumberHash, BlockRetrieving, RangeInclusive}, - crate::ethrpc::Web3, - anyhow::{bail, Context, Result}, - contracts::support::FetchBlock, - primitive_types::{H256, U256}, - web3::{ - transports::Batch, - types::{BlockNumber, CallRequest}, - }, -}; - -/// A hybrid `eth_getBlock` and `eth_call` based block fetcher. -/// -/// This is similar to the `eth_call` based fetcher, in that it can be used -/// for nodes where `eth_getBlockBy*` and `eth_blockNumber` calls return the -/// latest block for which a header is available even if the state isn't. -/// -/// However, some nodes (notably Nethermind) do **not** support `eth_call` on -/// the pending block, which is required for the `eth_call` based fetcher to -/// work. As a work-around, we issue simultaneous `eth_call` and `eth_getBlock` -/// requests to fetch the full block header (which includes the hash) and -/// simulate code on the latest block for which there is state. This gives us -/// the best of both worlds at the cost of an extra request per "poll". -pub struct BlockRetriever(pub Web3); - -#[async_trait::async_trait] -impl BlockRetrieving for BlockRetriever { - async fn current_block(&self) -> Result { - let (return_data, block) = { - let batch = web3::Web3::new(Batch::new(self.0.transport().clone())); - - let return_data = batch.eth().call( - CallRequest { - data: Some(bytecode!(FetchBlock)), - ..Default::default() - }, - Some(BlockNumber::Latest.into()), - ); - let block = batch.eth().block(BlockNumber::Latest.into()); - - batch.transport().submit_batch().await?; - - ( - return_data.await?.0, - block.await?.context("missing latest block")?, - ) - }; - - let call = decode( - return_data - .as_slice() - .try_into() - .context("unexpected block fetch return length")?, - )?; - let fetch = BlockInfo::try_from(block)?; - - // The `FetchBlock` contract works by returning `block.number - 1`, its - // hash, and its parent's hash. This means that, if we call it with - // `latest`, then `call.number` will be the block `latest - 1`. - // - // We accept a few cases here: - // 1. If `call.number + 1 >= fetch.number`, this means that the state for the - // `fetch.number` block is available (as the `eth_call` executed on that - // block or later). Hence, `Ok(fetch)` is the current block. - // 2. If `call.number + 1 == fetch.number - 1`, then there is a 2 block - // differential between `call` and `fetch`, meaning that the `fetch.number` - // block header is available but not its state, so return the `fetch.number - - // 1` as the current block. - // 3. Unexpectedly large differential between `call` and `fetch`. - if call.number.saturating_add(1) >= fetch.number { - Ok(fetch) - } else if call.number.saturating_add(1) == fetch.number.saturating_sub(1) { - Ok(BlockInfo { - number: fetch.number.saturating_sub(1), - hash: fetch.parent_hash, - parent_hash: call.hash, - }) - } else { - bail!("large differential between eth_getBlock {fetch:?} and eth_call {call:?}"); - } - } - - async fn block(&self, number: u64) -> Result { - self.0.block(number).await - } - - async fn blocks(&self, range: RangeInclusive) -> Result> { - self.0.blocks(range).await - } -} - -/// Decodes the return data from the `FetchBlock` contract. -fn decode(return_data: [u8; 96]) -> Result { - let number = u64::try_from(U256::from_big_endian(&return_data[0..32])) - .ok() - .context("block number overflows u64")?; - let hash = H256::from_slice(&return_data[32..64]); - let parent_hash = H256::from_slice(&return_data[64..96]); - - Ok(BlockInfo { - number, - hash, - parent_hash, - }) -} - -#[cfg(test)] -mod tests { - use {super::*, crate::ethrpc::create_env_test_transport}; - - #[ignore] - #[tokio::test] - async fn node() { - let retriever = BlockRetriever(Web3::new(create_env_test_transport())); - - let mut block = Option::::None; - for _ in 0..3 { - loop { - let current = retriever.current_block().await.unwrap(); - if block.is_none() || matches!(block, Some(b) if b < current.number) { - println!("current block: {current:#?}"); - block = Some(current.number); - break; - } - } - } - } -}