From 76e08ac439b29cc80ea7216e050e60af5e836554 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Thu, 16 Nov 2023 16:24:40 +0300 Subject: [PATCH 01/20] Fixed uniswap like implementation --- .../src/boundary/liquidity/uniswap/v2.rs | 1 + crates/shared/src/sources/uniswap_v2.rs | 1 + .../src/sources/uniswap_v2/pool_fetching.rs | 31 ++++++++++++++----- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/crates/driver/src/boundary/liquidity/uniswap/v2.rs b/crates/driver/src/boundary/liquidity/uniswap/v2.rs index f5671a3564..121c21d2ab 100644 --- a/crates/driver/src/boundary/liquidity/uniswap/v2.rs +++ b/crates/driver/src/boundary/liquidity/uniswap/v2.rs @@ -153,6 +153,7 @@ where let pool_fetcher = PoolFetcher { pool_reader: reader(web3.clone(), pair_provider), web3: web3.clone(), + non_existent_pools: Default::default(), }; let pool_cache = Arc::new(PoolCache::new( diff --git a/crates/shared/src/sources/uniswap_v2.rs b/crates/shared/src/sources/uniswap_v2.rs index c3bdb3ae7c..67c444bb8a 100644 --- a/crates/shared/src/sources/uniswap_v2.rs +++ b/crates/shared/src/sources/uniswap_v2.rs @@ -106,6 +106,7 @@ impl UniV2BaselineSourceParameters { let fetcher = pool_fetching::PoolFetcher { pool_reader, web3: web3.clone(), + non_existent_pools: Default::default(), }; Ok(UniV2BaselineSource { router, diff --git a/crates/shared/src/sources/uniswap_v2/pool_fetching.rs b/crates/shared/src/sources/uniswap_v2/pool_fetching.rs index f5599fc205..9222a63994 100644 --- a/crates/shared/src/sources/uniswap_v2/pool_fetching.rs +++ b/crates/shared/src/sources/uniswap_v2/pool_fetching.rs @@ -16,6 +16,7 @@ use { model::TokenPair, num::rational::Ratio, std::collections::HashSet, + std::sync::RwLock, }; const POOL_SWAP_GAS_COST: usize = 60_000; @@ -205,6 +206,7 @@ impl BaselineSolvable for Pool { pub struct PoolFetcher { pub pool_reader: Reader, pub web3: Web3, + pub non_existent_pools: RwLock>, } impl PoolFetcher { @@ -216,6 +218,7 @@ impl PoolFetcher { web3: web3.clone(), }, web3, + non_existent_pools: Default::default(), } } } @@ -226,19 +229,33 @@ where Reader: PoolReading, { async fn fetch(&self, token_pairs: HashSet, at_block: Block) -> Result> { + let mut token_pairs: Vec<_> = token_pairs.into_iter().collect(); + { + let non_existent_pools = self.non_existent_pools.read().unwrap(); + token_pairs.retain(|pair| !non_existent_pools.contains(pair)); + } let mut batch = Web3CallBatch::new(self.web3.transport().clone()); let block = BlockId::Number(at_block.into()); let futures = token_pairs - .into_iter() - .map(|pair| self.pool_reader.read_state(pair, &mut batch, block)) + .iter() + .map(|pair| self.pool_reader.read_state(*pair, &mut batch, block)) .collect::>(); batch.execute_all(MAX_BATCH_SIZE).await; - future::join_all(futures) - .await - .into_iter() - .filter_map(|pool| pool.transpose()) - .collect() + let results = future::try_join_all(futures).await?; + + let mut new_missing_pairs = vec![]; + let mut pools = vec![]; + for (result, key) in results.into_iter().zip(token_pairs) { + match result { + Some(pool) => pools.push(pool), + None => new_missing_pairs.push(key), + } + } + if !new_missing_pairs.is_empty() { + self.non_existent_pools.write().unwrap().extend(new_missing_pairs); + } + Ok(pools) } } From 7434628f347866bdd4107e27902dca223f64534f Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Thu, 16 Nov 2023 17:28:20 +0300 Subject: [PATCH 02/20] Omit requests for missing balancer pools --- .../balancer_v2/pool_fetching/registry.rs | 65 +++++++++++++------ 1 file changed, 45 insertions(+), 20 deletions(-) diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs b/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs index 2e5e01774a..e8390f85ea 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs @@ -15,6 +15,7 @@ use { PoolStatus, }, }, + crate::sources::balancer_v2::pools::PoolIndexing, anyhow::Result, contracts::{balancer_v2_base_pool_factory, BalancerV2BasePoolFactory}, ethcontract::{dyns::DynAllEventsBuilder, errors::MethodError, BlockId, Instance, H256}, @@ -30,6 +31,8 @@ use { model::TokenPair, std::{collections::HashSet, sync::Arc}, tokio::sync::Mutex, + std::sync::RwLock, + futures::FutureExt, }; pub struct BasePoolFactoryContract(BalancerV2BasePoolFactory); @@ -64,6 +67,7 @@ where web3: Web3, fetcher: Arc>, updater: PoolUpdater, + non_existent_pools: RwLock>, } impl Registry @@ -89,6 +93,7 @@ where web3, fetcher, updater, + non_existent_pools: Default::default(), } } } @@ -106,20 +111,31 @@ where .pool_ids_for_token_pairs(&token_pairs) } - async fn pools_by_id(&self, pool_ids: HashSet, block: Block) -> Result> { + async fn pools_by_id(&self, mut pool_ids: HashSet, block: Block) -> Result> { + { + let non_existent_pools = self.non_existent_pools.read().unwrap(); + pool_ids.retain(|id| !non_existent_pools.contains(id)); + } let mut batch = Web3CallBatch::new(self.web3.transport().clone()); let block = BlockId::Number(block.into()); let pool_infos = self.updater.lock().await.store().pools_by_id(&pool_ids); let pool_futures = pool_infos .into_iter() - .map(|pool_info| self.fetcher.fetch_pool(&pool_info, &mut batch, block)) + .map(|pool_info| { + let id = pool_info.common().id; + self.fetcher.fetch_pool(&pool_info, &mut batch, block).map(move |result| (id, result)) + }) .collect::>(); batch.execute_all(MAX_BATCH_SIZE).await; - let pools = future::join_all(pool_futures).await; - collect_pool_results(pools) + let results = future::join_all(pool_futures).await; + let (pools, missing_ids) = collect_pool_results(results)?; + if !missing_ids.is_empty() { + self.non_existent_pools.write().unwrap().extend(missing_ids); + } + Ok(pools) } } @@ -145,15 +161,20 @@ fn base_pool_factory(contract_instance: &Instance) -> BalancerV2B ) } -fn collect_pool_results(pools: Vec>) -> Result> { - pools - .into_iter() - .filter_map(|pool| match pool { - Ok(pool) => Some(Ok(pool.active()?)), - Err(err) if is_contract_error(&err) => None, - Err(err) => Some(Err(err)), - }) - .collect() +/// Returns the list of found pools and a list of pool ids that could not be found. +fn collect_pool_results(results: Vec<(H256, Result)>) -> Result<(Vec, Vec)> { + let mut fetched_pools = Vec::with_capacity(results.len()); + let mut missing_ids = vec![]; + for (id, result) in results { + match result { + Ok(PoolStatus::Active(pool)) => fetched_pools.push(pool), + Ok(PoolStatus::Disabled) => missing_ids.push(id), + Ok( PoolStatus::Paused) => {}, + Err(err) if is_contract_error(&err) => missing_ids.push(id), + Err(err) => return Err(err), + } + } + Ok((fetched_pools, missing_ids)) } fn is_contract_error(err: &anyhow::Error) -> bool { @@ -180,23 +201,27 @@ mod tests { #[tokio::test] async fn collecting_results_filters_paused_pools_and_contract_errors() { let results = vec![ - Ok(PoolStatus::Active(Pool { + (Default::default(), Ok(PoolStatus::Active(Pool { id: Default::default(), kind: PoolKind::Weighted(weighted::PoolState { tokens: Default::default(), swap_fee: Bfp::zero(), version: Default::default(), }), - })), - Ok(PoolStatus::Paused), - Err(ethcontract_error::testing_contract_error().into()), + }))), + (Default::default(), Ok(PoolStatus::Paused)), + (Default::default(), Err(ethcontract_error::testing_contract_error().into())) ]; - assert_eq!(collect_pool_results(results).unwrap().len(), 1); + let (fetched, missing) = collect_pool_results(results).unwrap(); + assert_eq!(fetched.len(), 1); + assert!(missing.is_empty()); } #[tokio::test] async fn collecting_results_forwards_node_error() { - let node_err = Err(ethcontract_error::testing_node_error().into()); - assert!(collect_pool_results(vec![node_err]).is_err()); + let node_err = (Default::default(), Err(ethcontract_error::testing_node_error().into())); + let (fetched, missing) = collect_pool_results(vec![node_err]).unwrap(); + assert!(fetched.is_empty()); + assert_eq!(missing, vec![Default::default()]); } } From 8a03d12dd3dadf2e168b44e11d85f427e4540843 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Fri, 17 Nov 2023 16:27:07 +0300 Subject: [PATCH 03/20] cargo fmt and fix unit tests --- .../balancer_v2/pool_fetching/registry.rs | 64 ++++++++++++------- .../src/sources/uniswap_v2/pool_fetching.rs | 8 ++- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs b/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs index e8390f85ea..493a123922 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs @@ -12,10 +12,10 @@ use { common::PoolInfoFetching, FactoryIndexing, Pool, + PoolIndexing, PoolStatus, }, }, - crate::sources::balancer_v2::pools::PoolIndexing, anyhow::Result, contracts::{balancer_v2_base_pool_factory, BalancerV2BasePoolFactory}, ethcontract::{dyns::DynAllEventsBuilder, errors::MethodError, BlockId, Instance, H256}, @@ -26,13 +26,14 @@ use { Web3Transport, MAX_BATCH_SIZE, }, - futures::future, + futures::{future, FutureExt}, hex_literal::hex, model::TokenPair, - std::{collections::HashSet, sync::Arc}, + std::{ + collections::HashSet, + sync::{Arc, RwLock}, + }, tokio::sync::Mutex, - std::sync::RwLock, - futures::FutureExt, }; pub struct BasePoolFactoryContract(BalancerV2BasePoolFactory); @@ -124,7 +125,9 @@ where .into_iter() .map(|pool_info| { let id = pool_info.common().id; - self.fetcher.fetch_pool(&pool_info, &mut batch, block).map(move |result| (id, result)) + self.fetcher + .fetch_pool(&pool_info, &mut batch, block) + .map(move |result| (id, result)) }) .collect::>(); @@ -161,15 +164,18 @@ fn base_pool_factory(contract_instance: &Instance) -> BalancerV2B ) } -/// Returns the list of found pools and a list of pool ids that could not be found. -fn collect_pool_results(results: Vec<(H256, Result)>) -> Result<(Vec, Vec)> { +/// Returns the list of found pools and a list of pool ids that could not be +/// found. +fn collect_pool_results( + results: Vec<(H256, Result)>, +) -> Result<(Vec, Vec)> { let mut fetched_pools = Vec::with_capacity(results.len()); let mut missing_ids = vec![]; for (id, result) in results { match result { Ok(PoolStatus::Active(pool)) => fetched_pools.push(pool), Ok(PoolStatus::Disabled) => missing_ids.push(id), - Ok( PoolStatus::Paused) => {}, + Ok(PoolStatus::Paused) => {} Err(err) if is_contract_error(&err) => missing_ids.push(id), Err(err) => return Err(err), } @@ -196,32 +202,44 @@ mod tests { swap::fixed_point::Bfp, }, }, + std::str::FromStr, }; #[tokio::test] async fn collecting_results_filters_paused_pools_and_contract_errors() { + let bad_pool = + H256::from_str("e337fcd52afd6b98847baab279cda6c3980fcb185da9e959fd489ffd210eac60") + .unwrap(); let results = vec![ - (Default::default(), Ok(PoolStatus::Active(Pool { - id: Default::default(), - kind: PoolKind::Weighted(weighted::PoolState { - tokens: Default::default(), - swap_fee: Bfp::zero(), - version: Default::default(), - }), - }))), + ( + Default::default(), + Ok(PoolStatus::Active(Pool { + id: Default::default(), + kind: PoolKind::Weighted(weighted::PoolState { + tokens: Default::default(), + swap_fee: Bfp::zero(), + version: Default::default(), + }), + })), + ), (Default::default(), Ok(PoolStatus::Paused)), - (Default::default(), Err(ethcontract_error::testing_contract_error().into())) + ( + bad_pool, + Err(ethcontract_error::testing_contract_error().into()), + ), ]; let (fetched, missing) = collect_pool_results(results).unwrap(); assert_eq!(fetched.len(), 1); - assert!(missing.is_empty()); + assert_eq!(missing, vec![bad_pool]); } #[tokio::test] async fn collecting_results_forwards_node_error() { - let node_err = (Default::default(), Err(ethcontract_error::testing_node_error().into())); - let (fetched, missing) = collect_pool_results(vec![node_err]).unwrap(); - assert!(fetched.is_empty()); - assert_eq!(missing, vec![Default::default()]); + let node_err = ( + Default::default(), + Err(ethcontract_error::testing_node_error().into()), + ); + let result = collect_pool_results(vec![node_err]); + assert!(result.is_err()); } } diff --git a/crates/shared/src/sources/uniswap_v2/pool_fetching.rs b/crates/shared/src/sources/uniswap_v2/pool_fetching.rs index 9222a63994..c56b9a363f 100644 --- a/crates/shared/src/sources/uniswap_v2/pool_fetching.rs +++ b/crates/shared/src/sources/uniswap_v2/pool_fetching.rs @@ -15,8 +15,7 @@ use { }, model::TokenPair, num::rational::Ratio, - std::collections::HashSet, - std::sync::RwLock, + std::{collections::HashSet, sync::RwLock}, }; const POOL_SWAP_GAS_COST: usize = 60_000; @@ -253,7 +252,10 @@ where } } if !new_missing_pairs.is_empty() { - self.non_existent_pools.write().unwrap().extend(new_missing_pairs); + self.non_existent_pools + .write() + .unwrap() + .extend(new_missing_pairs); } Ok(pools) } From 99f9ffae06a71ef959861ebba2c84d653196c99e Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Fri, 17 Nov 2023 15:56:09 +0300 Subject: [PATCH 04/20] Remove manual batching from balancer liquidity fetching --- .../balancer_v2/pool_fetching/registry.rs | 11 +-- .../shared/src/sources/balancer_v2/pools.rs | 2 - .../src/sources/balancer_v2/pools/common.rs | 81 +++++++------------ .../balancer_v2/pools/composable_stable.rs | 32 +++----- .../pools/liquidity_bootstrapping.rs | 33 +++----- .../src/sources/balancer_v2/pools/stable.rs | 15 ++-- .../src/sources/balancer_v2/pools/weighted.rs | 14 +--- 7 files changed, 62 insertions(+), 126 deletions(-) diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs b/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs index 493a123922..c50cb3defb 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs @@ -21,10 +21,7 @@ use { ethcontract::{dyns::DynAllEventsBuilder, errors::MethodError, BlockId, Instance, H256}, ethrpc::{ current_block::{BlockNumberHash, BlockRetrieving}, - Web3, - Web3CallBatch, Web3Transport, - MAX_BATCH_SIZE, }, futures::{future, FutureExt}, hex_literal::hex, @@ -65,7 +62,6 @@ pub struct Registry where Factory: FactoryIndexing, { - web3: Web3, fetcher: Arc>, updater: PoolUpdater, non_existent_pools: RwLock>, @@ -83,7 +79,6 @@ where initial_pools: Vec, start_sync_at_block: Option, ) -> Self { - let web3 = factory_instance.web3(); let updater = Mutex::new(EventHandler::new( block_retreiver, BasePoolFactoryContract(base_pool_factory(factory_instance)), @@ -91,7 +86,6 @@ where start_sync_at_block, )); Self { - web3, fetcher, updater, non_existent_pools: Default::default(), @@ -117,7 +111,6 @@ where let non_existent_pools = self.non_existent_pools.read().unwrap(); pool_ids.retain(|id| !non_existent_pools.contains(id)); } - let mut batch = Web3CallBatch::new(self.web3.transport().clone()); let block = BlockId::Number(block.into()); let pool_infos = self.updater.lock().await.store().pools_by_id(&pool_ids); @@ -126,13 +119,11 @@ where .map(|pool_info| { let id = pool_info.common().id; self.fetcher - .fetch_pool(&pool_info, &mut batch, block) + .fetch_pool(&pool_info, block) .map(move |result| (id, result)) }) .collect::>(); - batch.execute_all(MAX_BATCH_SIZE).await; - let results = future::join_all(pool_futures).await; let (pools, missing_ids) = collect_pool_results(results)?; if !missing_ids.is_empty() { diff --git a/crates/shared/src/sources/balancer_v2/pools.rs b/crates/shared/src/sources/balancer_v2/pools.rs index e11bb07767..4d7eb26f15 100644 --- a/crates/shared/src/sources/balancer_v2/pools.rs +++ b/crates/shared/src/sources/balancer_v2/pools.rs @@ -15,7 +15,6 @@ pub mod weighted; use { super::graph_api::PoolData, - crate::ethrpc::Web3CallBatch, anyhow::Result, ethcontract::{BlockId, H256}, futures::future::BoxFuture, @@ -116,7 +115,6 @@ pub trait FactoryIndexing: Send + Sync + 'static { // where we can't use other lifetimes here. // common_pool_state: BoxFuture<'static, common::PoolState>, - batch: &mut Web3CallBatch, block: BlockId, ) -> BoxFuture<'static, Result>>; } diff --git a/crates/shared/src/sources/balancer_v2/pools/common.rs b/crates/shared/src/sources/balancer_v2/pools/common.rs index 67b8f8c3f5..bbe3b88279 100644 --- a/crates/shared/src/sources/balancer_v2/pools/common.rs +++ b/crates/shared/src/sources/balancer_v2/pools/common.rs @@ -3,7 +3,6 @@ use { super::{FactoryIndexing, Pool, PoolIndexing as _, PoolStatus}, crate::{ - ethrpc::Web3CallBatch, sources::balancer_v2::{ graph_api::{PoolData, PoolType}, swap::fixed_point::Bfp, @@ -13,7 +12,7 @@ use { anyhow::{anyhow, ensure, Context, Result}, contracts::{BalancerV2BasePool, BalancerV2Vault}, ethcontract::{BlockId, Bytes, H160, H256, U256}, - futures::{future::BoxFuture, FutureExt as _}, + futures::{future::BoxFuture, FutureExt as _, TryFutureExt}, std::{collections::BTreeMap, future::Future, sync::Arc}, tokio::sync::oneshot, }; @@ -34,7 +33,6 @@ where fn fetch_pool( &self, pool: &Factory::PoolInfo, - batch: &mut Web3CallBatch, block: BlockId, ) -> BoxFuture<'static, Result>; } @@ -110,23 +108,20 @@ impl PoolInfoFetcher { fn fetch_common_pool_state( &self, pool: &PoolInfo, - batch: &mut Web3CallBatch, block: BlockId, ) -> BoxFuture<'static, Result> { let pool_contract = self.base_pool_at(pool.address); - let paused = pool_contract + let fetch_paused = pool_contract .get_paused_state() .block(block) - .batch_call(batch); - let swap_fee = pool_contract - .get_swap_fee_percentage() - .block(block) - .batch_call(batch); - let balances = self + .call() + .map_ok(|result| result.0); + let fetch_swap_fee = pool_contract.get_swap_fee_percentage().block(block).call(); + let fetch_balances = self .vault .get_pool_tokens(Bytes(pool.id.0)) .block(block) - .batch_call(batch); + .call(); // Because of a `mockall` limitation, we **need** the future returned // here to be `'static`. This requires us to clone and move `pool` into @@ -134,10 +129,11 @@ impl PoolInfoFetcher { // `pool`, i.e. `'_`. let pool = pool.clone(); async move { - let (paused, _, _) = paused.await?; - let swap_fee = Bfp::from_wei(swap_fee.await?); + let (paused, swap_fee, balances) = + futures::try_join!(fetch_paused, fetch_swap_fee, fetch_balances)?; + let swap_fee = Bfp::from_wei(swap_fee); - let (token_addresses, balances, _) = balances.await?; + let (token_addresses, balances, _) = balances; ensure!(pool.tokens == token_addresses, "pool token mismatch"); let tokens = itertools::izip!(&pool.tokens, balances, &pool.scaling_factors) .map(|(&address, balance, &scaling_factor)| { @@ -180,15 +176,14 @@ where fn fetch_pool( &self, pool_info: &Factory::PoolInfo, - batch: &mut Web3CallBatch, block: BlockId, ) -> BoxFuture<'static, Result> { let pool_id = pool_info.common().id; let (common_pool_state, common_pool_state_ok) = - share_common_pool_state(self.fetch_common_pool_state(pool_info.common(), batch, block)); + share_common_pool_state(self.fetch_common_pool_state(pool_info.common(), block)); let pool_state = self.factory - .fetch_pool_state(pool_info, common_pool_state_ok.boxed(), batch, block); + .fetch_pool_state(pool_info, common_pool_state_ok.boxed(), block); async move { let common_pool_state = common_pool_state.await?; @@ -449,13 +444,10 @@ mod tests { }; let pool_state = { - let mut batch = Web3CallBatch::new(web3.transport().clone()); let block = web3.eth().block_number().await.unwrap(); - let pool_state = - pool_info_fetcher.fetch_common_pool_state(&pool_info, &mut batch, block.into()); + let pool_state = pool_info_fetcher.fetch_common_pool_state(&pool_info, block.into()); - batch.execute_all(100).await; pool_state.await.unwrap() }; @@ -521,13 +513,10 @@ mod tests { }; let pool_state = { - let mut batch = Web3CallBatch::new(web3.transport().clone()); let block = web3.eth().block_number().await.unwrap(); - let pool_state = - pool_info_fetcher.fetch_common_pool_state(&pool_info, &mut batch, block.into()); + let pool_state = pool_info_fetcher.fetch_common_pool_state(&pool_info, block.into()); - batch.execute_all(100).await; pool_state.await }; @@ -607,12 +596,11 @@ mod tests { .with( predicate::eq(pool_info.clone()), predicate::always(), - predicate::always(), predicate::eq(BlockId::from(block)), ) .returning({ let pool_state = pool_state.clone(); - move |_, _, _, _| future::ready(Ok(Some(pool_state.clone()))).boxed() + move |_, _, _| future::ready(Ok(Some(pool_state.clone()))).boxed() }); let pool_info_fetcher = PoolInfoFetcher { @@ -621,13 +609,10 @@ mod tests { token_infos: Arc::new(MockTokenInfoFetching::new()), }; - let pool_status = { - let mut batch = Web3CallBatch::new(web3.transport().clone()); - let pool_state = pool_info_fetcher.fetch_pool(&pool_info, &mut batch, block.into()); - - batch.execute_all(100).await; - pool_state.await.unwrap() - }; + let pool_status = pool_info_fetcher + .fetch_pool(&pool_info, block.into()) + .await + .unwrap(); assert_eq!( pool_status, @@ -661,9 +646,8 @@ mod tests { predicate::always(), predicate::always(), predicate::always(), - predicate::always(), ) - .returning(|_, _, _, _| { + .returning(|_, _, _| { future::ready(Ok(Some(weighted::PoolState { swap_fee: Bfp::zero(), tokens: Default::default(), @@ -689,13 +673,11 @@ mod tests { }; let pool_status = { - let mut batch = Web3CallBatch::new(web3.transport().clone()); let block = web3.eth().block_number().await.unwrap(); - - let pool_state = pool_info_fetcher.fetch_pool(&pool_info, &mut batch, block.into()); - - batch.execute_all(100).await; - pool_state.await.unwrap() + pool_info_fetcher + .fetch_pool(&pool_info, block.into()) + .await + .unwrap() }; assert_eq!(pool_status, PoolStatus::Paused); @@ -724,9 +706,8 @@ mod tests { predicate::always(), predicate::always(), predicate::always(), - predicate::always(), ) - .returning(|_, _, _, _| future::ready(Ok(None)).boxed()); + .returning(|_, _, _| future::ready(Ok(None)).boxed()); let pool_info_fetcher = PoolInfoFetcher { vault: BalancerV2Vault::at(&web3, vault.address()), @@ -745,13 +726,11 @@ mod tests { }; let pool_status = { - let mut batch = Web3CallBatch::new(web3.transport().clone()); let block = web3.eth().block_number().await.unwrap(); - - let pool_state = pool_info_fetcher.fetch_pool(&pool_info, &mut batch, block.into()); - - batch.execute_all(100).await; - pool_state.await.unwrap() + pool_info_fetcher + .fetch_pool(&pool_info, block.into()) + .await + .unwrap() }; assert_eq!(pool_status, PoolStatus::Disabled); diff --git a/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs b/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs index e46176e717..d10c9d22ea 100644 --- a/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs +++ b/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs @@ -2,12 +2,9 @@ use { super::{common, FactoryIndexing, PoolIndexing}, - crate::{ - ethrpc::Web3CallBatch, - sources::balancer_v2::{ - graph_api::{PoolData, PoolType}, - swap::fixed_point::Bfp, - }, + crate::sources::balancer_v2::{ + graph_api::{PoolData, PoolType}, + swap::fixed_point::Bfp, }, anyhow::Result, contracts::{BalancerV2ComposableStablePool, BalancerV2ComposableStablePoolFactory}, @@ -47,7 +44,6 @@ impl FactoryIndexing for BalancerV2ComposableStablePoolFactory { &self, pool_info: &Self::PoolInfo, common_pool_state: BoxFuture<'static, common::PoolState>, - batch: &mut Web3CallBatch, block: BlockId, ) -> BoxFuture<'static, Result>> { let pool_contract = BalancerV2ComposableStablePool::at( @@ -55,20 +51,21 @@ impl FactoryIndexing for BalancerV2ComposableStablePoolFactory { pool_info.common.address, ); - let scaling_factors = pool_contract - .get_scaling_factors() - .block(block) - .batch_call(batch); - let amplification_parameter = pool_contract + let fetch_common = common_pool_state.map(Result::Ok); + let fetch_scaling_factors = pool_contract.get_scaling_factors().block(block).call(); + let fetch_amplification_parameter = pool_contract .get_amplification_parameter() .block(block) - .batch_call(batch); + .call(); async move { - let common = common_pool_state.await; - let scaling_factors = scaling_factors.await?; + let (common, scaling_factors, amplification_parameter) = futures::try_join!( + fetch_common, + fetch_scaling_factors, + fetch_amplification_parameter + )?; let amplification_parameter = { - let (factor, _, precision) = amplification_parameter.await?; + let (factor, _, precision) = amplification_parameter; AmplificationParameter::new(factor, precision)? }; @@ -164,17 +161,14 @@ mod tests { }; let pool_state = { - let mut batch = Web3CallBatch::new(web3.transport().clone()); let block = web3.eth().block_number().await.unwrap(); let pool_state = factory.fetch_pool_state( &pool_info, future::ready(common_pool_state.clone()).boxed(), - &mut batch, block.into(), ); - batch.execute_all(100).await; pool_state.await.unwrap() }; diff --git a/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs b/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs index fe30f8f526..9a37b9c886 100644 --- a/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs +++ b/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs @@ -2,12 +2,9 @@ use { super::{common, FactoryIndexing, PoolIndexing}, - crate::{ - ethrpc::Web3CallBatch, - sources::balancer_v2::{ - graph_api::{PoolData, PoolType}, - swap::fixed_point::Bfp, - }, + crate::sources::balancer_v2::{ + graph_api::{PoolData, PoolType}, + swap::fixed_point::Bfp, }, anyhow::Result, contracts::{ @@ -54,7 +51,6 @@ impl FactoryIndexing for BalancerV2LiquidityBootstrappingPoolFactory { &self, pool_info: &Self::PoolInfo, common_pool_state: BoxFuture<'static, common::PoolState>, - batch: &mut Web3CallBatch, block: BlockId, ) -> BoxFuture<'static, Result>> { let pool_contract = BalancerV2LiquidityBootstrappingPool::at( @@ -62,24 +58,19 @@ impl FactoryIndexing for BalancerV2LiquidityBootstrappingPoolFactory { pool_info.common.address, ); + let fetch_common = common_pool_state.map(Result::Ok); // Liquidity bootstrapping pools use dynamic weights, meaning that we // need to fetch them every time. - let weights = pool_contract - .get_normalized_weights() - .block(block) - .batch_call(batch); - let swap_enabled = pool_contract - .get_swap_enabled() - .block(block) - .batch_call(batch); + let fetch_weights = pool_contract.get_normalized_weights().block(block).call(); + let fetch_swap_enabled = pool_contract.get_swap_enabled().block(block).call(); async move { - if !swap_enabled.await? { + let (common, weights, swap_enabled) = + futures::try_join!(fetch_common, fetch_weights, fetch_swap_enabled)?; + if !swap_enabled { return Ok(None); } - let common = common_pool_state.await; - let weights = weights.await?; let tokens = common .tokens .into_iter() @@ -188,17 +179,14 @@ mod tests { }; let pool_state = { - let mut batch = Web3CallBatch::new(web3.transport().clone()); let block = web3.eth().block_number().await.unwrap(); let pool_state = factory.fetch_pool_state( &pool_info, future::ready(common_pool_state.clone()).boxed(), - &mut batch, block.into(), ); - batch.execute_all(100).await; pool_state.await.unwrap() }; @@ -255,17 +243,14 @@ mod tests { }; let pool_state = { - let mut batch = Web3CallBatch::new(web3.transport().clone()); let block = web3.eth().block_number().await.unwrap(); let pool_state = factory.fetch_pool_state( &pool_info, future::ready(common_pool_state.clone()).boxed(), - &mut batch, block.into(), ); - batch.execute_all(100).await; pool_state.await.unwrap() }; diff --git a/crates/shared/src/sources/balancer_v2/pools/stable.rs b/crates/shared/src/sources/balancer_v2/pools/stable.rs index 118411e809..2c21222dbc 100644 --- a/crates/shared/src/sources/balancer_v2/pools/stable.rs +++ b/crates/shared/src/sources/balancer_v2/pools/stable.rs @@ -4,7 +4,6 @@ use { super::{common, FactoryIndexing, PoolIndexing}, crate::{ conversions::U256Ext as _, - ethrpc::Web3CallBatch, sources::balancer_v2::{ graph_api::{PoolData, PoolType}, swap::fixed_point::Bfp, @@ -91,21 +90,22 @@ impl FactoryIndexing for BalancerV2StablePoolFactoryV2 { &self, pool_info: &Self::PoolInfo, common_pool_state: BoxFuture<'static, common::PoolState>, - batch: &mut Web3CallBatch, block: BlockId, ) -> BoxFuture<'static, Result>> { let pool_contract = BalancerV2StablePool::at(&self.raw_instance().web3(), pool_info.common.address); - let amplification_parameter = pool_contract + let fetch_common = common_pool_state.map(Result::Ok); + let fetch_amplification_parameter = pool_contract .get_amplification_parameter() .block(block) - .batch_call(batch); + .call(); async move { - let common = common_pool_state.await; + let (common, amplification_parameter) = + futures::try_join!(fetch_common, fetch_amplification_parameter)?; let amplification_parameter = { - let (factor, _, precision) = amplification_parameter.await?; + let (factor, _, precision) = amplification_parameter; AmplificationParameter::new(factor, precision)? }; @@ -179,17 +179,14 @@ mod tests { }; let pool_state = { - let mut batch = Web3CallBatch::new(web3.transport().clone()); let block = web3.eth().block_number().await.unwrap(); let pool_state = factory.fetch_pool_state( &pool_info, future::ready(common_pool_state.clone()).boxed(), - &mut batch, block.into(), ); - batch.execute_all(100).await; pool_state.await.unwrap() }; diff --git a/crates/shared/src/sources/balancer_v2/pools/weighted.rs b/crates/shared/src/sources/balancer_v2/pools/weighted.rs index d8c9c7604b..1cc821f65f 100644 --- a/crates/shared/src/sources/balancer_v2/pools/weighted.rs +++ b/crates/shared/src/sources/balancer_v2/pools/weighted.rs @@ -2,12 +2,9 @@ use { super::{common, FactoryIndexing, PoolIndexing}, - crate::{ - ethrpc::Web3CallBatch, - sources::balancer_v2::{ - graph_api::{PoolData, PoolType}, - swap::fixed_point::Bfp, - }, + crate::sources::balancer_v2::{ + graph_api::{PoolData, PoolType}, + swap::fixed_point::Bfp, }, anyhow::{anyhow, Result}, contracts::{ @@ -93,7 +90,6 @@ impl FactoryIndexing for BalancerV2WeightedPoolFactory { &self, pool_info: &Self::PoolInfo, common_pool_state: BoxFuture<'static, common::PoolState>, - _: &mut Web3CallBatch, _: BlockId, ) -> BoxFuture<'static, Result>> { pool_state(Version::V0, pool_info.clone(), common_pool_state) @@ -114,7 +110,6 @@ impl FactoryIndexing for BalancerV2WeightedPoolFactoryV3 { &self, pool_info: &Self::PoolInfo, common_pool_state: BoxFuture<'static, common::PoolState>, - _: &mut Web3CallBatch, _: BlockId, ) -> BoxFuture<'static, Result>> { pool_state(Version::V3Plus, pool_info.clone(), common_pool_state) @@ -284,17 +279,14 @@ mod tests { }; let pool_state = { - let mut batch = Web3CallBatch::new(web3.transport().clone()); let block = web3.eth().block_number().await.unwrap(); let pool_state = factory.fetch_pool_state( &pool_info, future::ready(common_pool_state.clone()).boxed(), - &mut batch, block.into(), ); - batch.execute_all(100).await; pool_state.await.unwrap() }; From 4be15ce854e915c735ba639df2ace104aefe96a9 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Fri, 17 Nov 2023 16:07:39 +0300 Subject: [PATCH 05/20] Drop manual batching from uni v2 liquidity fetching --- crates/shared/src/sources/swapr/reader.rs | 22 +++---- .../src/sources/uniswap_v2/pool_fetching.rs | 57 +++++-------------- 2 files changed, 23 insertions(+), 56 deletions(-) diff --git a/crates/shared/src/sources/swapr/reader.rs b/crates/shared/src/sources/swapr/reader.rs index 0522e81b79..90410db257 100644 --- a/crates/shared/src/sources/swapr/reader.rs +++ b/crates/shared/src/sources/swapr/reader.rs @@ -1,10 +1,7 @@ //! A pool state reading implementation specific to Swapr. use { - crate::{ - ethrpc::Web3CallBatch, - sources::uniswap_v2::pool_fetching::{self, DefaultPoolReader, Pool, PoolReading}, - }, + crate::sources::uniswap_v2::pool_fetching::{self, DefaultPoolReader, Pool, PoolReading}, anyhow::Result, contracts::ISwaprPair, ethcontract::{errors::MethodError, BlockId}, @@ -23,19 +20,18 @@ pub struct SwaprPoolReader(pub DefaultPoolReader); const FEE_BASE: u32 = 10_000; impl PoolReading for SwaprPoolReader { - fn read_state( - &self, - pair: TokenPair, - batch: &mut Web3CallBatch, - block: BlockId, - ) -> BoxFuture<'_, Result>> { + fn read_state(&self, pair: TokenPair, block: BlockId) -> BoxFuture<'_, Result>> { let pair_address = self.0.pair_provider.pair_address(&pair); let pair_contract = ISwaprPair::at(&self.0.web3, pair_address); - let pool = self.0.read_state(pair, batch, block); - let fee = pair_contract.swap_fee().block(block).batch_call(batch); + let fetch_pool = self.0.read_state(pair, block); + let fetch_fee = pair_contract.swap_fee().block(block).call(); - async move { handle_results(pool.await, fee.await) }.boxed() + async move { + let (pool, fee) = futures::join!(fetch_pool, fetch_fee); + handle_results(pool, fee) + } + .boxed() } } diff --git a/crates/shared/src/sources/uniswap_v2/pool_fetching.rs b/crates/shared/src/sources/uniswap_v2/pool_fetching.rs index c56b9a363f..754971b72d 100644 --- a/crates/shared/src/sources/uniswap_v2/pool_fetching.rs +++ b/crates/shared/src/sources/uniswap_v2/pool_fetching.rs @@ -3,7 +3,7 @@ use { crate::{ baseline_solver::BaselineSolvable, ethcontract_error::EthcontractErrorType, - ethrpc::{Web3, Web3CallBatch, MAX_BATCH_SIZE}, + ethrpc::Web3, recent_block_cache::Block, }, anyhow::Result, @@ -36,30 +36,12 @@ pub trait PoolFetching: Send + Sync { /// Trait for abstracting the on-chain reading logic for pool state. pub trait PoolReading: Send + Sync { /// Read the pool state for the specified token pair. - /// - /// The caller specifies a Web3 call back to queue RPC requests into as well - /// as a block number to fetch the data on. - /// - /// This method intentionally **does not** use `async_trait` because - /// implementations are expected to queue up Ethereum RPC calls into the - /// specified batch when the method is called and not when the resulting - /// future is first polled. - fn read_state( - &self, - pair: TokenPair, - batch: &mut Web3CallBatch, - block: BlockId, - ) -> BoxFuture<'_, Result>>; + fn read_state(&self, pair: TokenPair, block: BlockId) -> BoxFuture<'_, Result>>; } impl PoolReading for Box { - fn read_state( - &self, - pair: TokenPair, - batch: &mut Web3CallBatch, - block: BlockId, - ) -> BoxFuture<'_, Result>> { - (**self).read_state(pair, batch, block) + fn read_state(&self, pair: TokenPair, block: BlockId) -> BoxFuture<'_, Result>> { + (**self).read_state(pair, block) } } @@ -233,13 +215,11 @@ where let non_existent_pools = self.non_existent_pools.read().unwrap(); token_pairs.retain(|pair| !non_existent_pools.contains(pair)); } - let mut batch = Web3CallBatch::new(self.web3.transport().clone()); let block = BlockId::Number(at_block.into()); let futures = token_pairs .iter() - .map(|pair| self.pool_reader.read_state(*pair, &mut batch, block)) + .map(|pair| self.pool_reader.read_state(*pair, block)) .collect::>(); - batch.execute_all(MAX_BATCH_SIZE).await; let results = future::try_join_all(futures).await?; @@ -271,12 +251,7 @@ pub struct DefaultPoolReader { } impl PoolReading for DefaultPoolReader { - fn read_state( - &self, - pair: TokenPair, - batch: &mut Web3CallBatch, - block: BlockId, - ) -> BoxFuture<'_, Result>> { + fn read_state(&self, pair: TokenPair, block: BlockId) -> BoxFuture<'_, Result>> { let pair_address = self.pair_provider.pair_address(&pair); let pair_contract = IUniswapLikePair::at(&self.web3, pair_address); @@ -284,23 +259,19 @@ impl PoolReading for DefaultPoolReader { let token0 = ERC20::at(&self.web3, pair.get().0); let token1 = ERC20::at(&self.web3, pair.get().1); - let reserves = pair_contract.get_reserves().block(block).batch_call(batch); - let token0_balance = token0 - .balance_of(pair_address) - .block(block) - .batch_call(batch); - let token1_balance = token1 - .balance_of(pair_address) - .block(block) - .batch_call(batch); + let fetch_reserves = pair_contract.get_reserves().block(block).call(); + let fetch_token0_balance = token0.balance_of(pair_address).block(block).call(); + let fetch_token1_balance = token1.balance_of(pair_address).block(block).call(); async move { + let (reserves, token0_balance, token1_balance) = + futures::join!(fetch_reserves, fetch_token0_balance, fetch_token1_balance); handle_results( FetchedPool { pair, - reserves: reserves.await, - token0_balance: token0_balance.await, - token1_balance: token1_balance.await, + reserves, + token0_balance, + token1_balance, }, pair_address, ) From 4ff95cd8aacecd58a6cddebca44ff74b139ca26a Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Fri, 17 Nov 2023 16:48:27 +0300 Subject: [PATCH 06/20] Drop unit tests which required delayed execution of batch calls --- .../balancer_v2/pools/composable_stable.rs | 82 ---------- .../pools/liquidity_bootstrapping.rs | 152 ------------------ .../src/sources/balancer_v2/pools/stable.rs | 73 --------- 3 files changed, 307 deletions(-) diff --git a/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs b/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs index d10c9d22ea..927ed0c8a9 100644 --- a/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs +++ b/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs @@ -97,91 +97,9 @@ mod tests { use { super::*, crate::sources::balancer_v2::graph_api::Token, - contracts::dummy_contract, ethcontract::{H160, H256}, - ethcontract_mock::Mock, - futures::future, - maplit::btreemap, }; - #[tokio::test] - async fn fetch_pool_state() { - let tokens = btreemap! { - H160([1; 20]) => common::TokenState { - balance: bfp!("1000.0").as_uint256(), - scaling_factor: Bfp::exp10(0), - }, - H160([2; 20]) => common::TokenState { - balance: bfp!("10.0").as_uint256(), - scaling_factor: bfp!("1.137117595629065656"), - }, - H160([3; 20]) => common::TokenState { - balance: 15_000_000.into(), - scaling_factor: Bfp::exp10(12), - }, - }; - let swap_fee = bfp!("0.00015"); - let amplification_parameter = - AmplificationParameter::new(200.into(), 10000.into()).unwrap(); - - let mock = Mock::new(42); - let web3 = mock.web3(); - - let pool = mock.deploy(BalancerV2ComposableStablePool::raw_contract().abi.clone()); - pool.expect_call( - BalancerV2ComposableStablePool::signatures().get_amplification_parameter(), - ) - .returns(( - amplification_parameter.factor(), - false, - amplification_parameter.precision(), - )); - pool.expect_call(BalancerV2ComposableStablePool::signatures().get_scaling_factors()) - .returns( - tokens - .values() - .map(|token| token.scaling_factor.as_uint256()) - .collect(), - ); - - let factory = dummy_contract!(BalancerV2ComposableStablePoolFactory, H160::default()); - let pool_info = PoolInfo { - common: common::PoolInfo { - id: H256([0x90; 32]), - address: pool.address(), - tokens: tokens.keys().copied().collect(), - scaling_factors: tokens.values().map(|token| token.scaling_factor).collect(), - block_created: 1337, - }, - }; - let common_pool_state = common::PoolState { - paused: false, - swap_fee, - tokens: tokens.clone(), - }; - - let pool_state = { - let block = web3.eth().block_number().await.unwrap(); - - let pool_state = factory.fetch_pool_state( - &pool_info, - future::ready(common_pool_state.clone()).boxed(), - block.into(), - ); - - pool_state.await.unwrap() - }; - - assert_eq!( - pool_state, - Some(PoolState { - tokens, - swap_fee, - amplification_parameter, - }) - ); - } - #[test] fn errors_when_converting_wrong_pool_type() { let pool = PoolData { diff --git a/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs b/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs index 9a37b9c886..065f436d57 100644 --- a/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs +++ b/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs @@ -102,161 +102,9 @@ mod tests { use { super::*, crate::sources::balancer_v2::graph_api::Token, - contracts::dummy_contract, ethcontract::{H160, H256}, - ethcontract_mock::Mock, - futures::future, - maplit::btreemap, }; - #[tokio::test] - async fn fetch_pool_state() { - let tokens = btreemap! { - H160([1; 20]) => TokenState { - common: common::TokenState { - balance: bfp!("1000.0").as_uint256(), - scaling_factor: Bfp::exp10(0), - }, - weight: bfp!("0.5"), - }, - H160([2; 20]) => TokenState { - common: common::TokenState { - balance: bfp!("10.0").as_uint256(), - scaling_factor: Bfp::exp10(0), - }, - weight: bfp!("0.3"), - }, - H160([3; 20]) => TokenState { - common: common::TokenState { - balance: 15_000_000.into(), - scaling_factor: Bfp::exp10(12), - }, - weight: bfp!("0.2"), - }, - }; - let swap_fee = bfp!("0.00015"); - - let mock = Mock::new(42); - let web3 = mock.web3(); - - let pool = mock.deploy( - BalancerV2LiquidityBootstrappingPool::raw_contract() - .abi - .clone(), - ); - pool.expect_call( - BalancerV2LiquidityBootstrappingPool::signatures().get_normalized_weights(), - ) - .returns( - tokens - .values() - .map(|token| token.weight.as_uint256()) - .collect(), - ); - pool.expect_call(BalancerV2LiquidityBootstrappingPool::signatures().get_swap_enabled()) - .returns(true); - - let factory = dummy_contract!(BalancerV2LiquidityBootstrappingPoolFactory, H160::default()); - let pool_info = PoolInfo { - common: common::PoolInfo { - id: H256([0x90; 32]), - address: pool.address(), - tokens: tokens.keys().copied().collect(), - scaling_factors: tokens - .values() - .map(|token| token.common.scaling_factor) - .collect(), - block_created: 1337, - }, - }; - let common_pool_state = common::PoolState { - paused: false, - swap_fee, - tokens: tokens - .iter() - .map(|(address, token)| (*address, token.common)) - .collect(), - }; - - let pool_state = { - let block = web3.eth().block_number().await.unwrap(); - - let pool_state = factory.fetch_pool_state( - &pool_info, - future::ready(common_pool_state.clone()).boxed(), - block.into(), - ); - - pool_state.await.unwrap() - }; - - assert_eq!( - pool_state, - Some(PoolState { - tokens, - swap_fee, - version: Version::V0 - }) - ); - } - - #[tokio::test] - async fn returns_none_if_swaps_disabled() { - let mock = Mock::new(42); - let web3 = mock.web3(); - - let pool = mock.deploy( - BalancerV2LiquidityBootstrappingPool::raw_contract() - .abi - .clone(), - ); - pool.expect_call( - BalancerV2LiquidityBootstrappingPool::signatures().get_normalized_weights(), - ) - .returns(vec![bfp!("0.5").as_uint256(), bfp!("0.5").as_uint256()]); - pool.expect_call(BalancerV2LiquidityBootstrappingPool::signatures().get_swap_enabled()) - .returns(false); - - let factory = dummy_contract!(BalancerV2LiquidityBootstrappingPoolFactory, H160::default()); - let pool_info = PoolInfo { - common: common::PoolInfo { - id: H256([0x90; 32]), - address: pool.address(), - tokens: vec![H160([1; 20]), H160([1; 20])], - scaling_factors: vec![1.into(), 1.into()], - block_created: 1337, - }, - }; - let common_pool_state = common::PoolState { - paused: false, - swap_fee: Bfp::zero(), - tokens: btreemap! { - H160([1; 20]) => common::TokenState { - balance: 0.into(), - scaling_factor: Bfp::exp10(0), - }, - H160([1; 20]) => common::TokenState { - balance: 0.into(), - scaling_factor: Bfp::exp10(0), - }, - }, - }; - - let pool_state = { - let block = web3.eth().block_number().await.unwrap(); - - let pool_state = factory.fetch_pool_state( - &pool_info, - future::ready(common_pool_state.clone()).boxed(), - block.into(), - ); - - pool_state.await.unwrap() - }; - - assert_eq!(pool_state, None); - } - #[test] fn errors_when_converting_wrong_pool_type() { let pool = PoolData { diff --git a/crates/shared/src/sources/balancer_v2/pools/stable.rs b/crates/shared/src/sources/balancer_v2/pools/stable.rs index 2c21222dbc..4e38fd3c24 100644 --- a/crates/shared/src/sources/balancer_v2/pools/stable.rs +++ b/crates/shared/src/sources/balancer_v2/pools/stable.rs @@ -124,82 +124,9 @@ mod tests { use { super::*, crate::sources::balancer_v2::graph_api::Token, - contracts::dummy_contract, ethcontract::{H160, H256}, - ethcontract_mock::Mock, - futures::future, - maplit::btreemap, }; - #[tokio::test] - async fn fetch_pool_state() { - let tokens = btreemap! { - H160([1; 20]) => common::TokenState { - balance: bfp!("1000.0").as_uint256(), - scaling_factor: Bfp::exp10(0), - }, - H160([2; 20]) => common::TokenState { - balance: bfp!("10.0").as_uint256(), - scaling_factor: Bfp::exp10(0), - }, - H160([3; 20]) => common::TokenState { - balance: 15_000_000.into(), - scaling_factor: Bfp::exp10(12), - }, - }; - let swap_fee = bfp!("0.00015"); - let amplification_parameter = - AmplificationParameter::new(200.into(), 10000.into()).unwrap(); - - let mock = Mock::new(42); - let web3 = mock.web3(); - - let pool = mock.deploy(BalancerV2StablePool::raw_contract().abi.clone()); - pool.expect_call(BalancerV2StablePool::signatures().get_amplification_parameter()) - .returns(( - amplification_parameter.factor, - false, - amplification_parameter.precision, - )); - - let factory = dummy_contract!(BalancerV2StablePoolFactoryV2, H160::default()); - let pool_info = PoolInfo { - common: common::PoolInfo { - id: H256([0x90; 32]), - address: pool.address(), - tokens: tokens.keys().copied().collect(), - scaling_factors: tokens.values().map(|token| token.scaling_factor).collect(), - block_created: 1337, - }, - }; - let common_pool_state = common::PoolState { - paused: false, - swap_fee, - tokens, - }; - - let pool_state = { - let block = web3.eth().block_number().await.unwrap(); - - let pool_state = factory.fetch_pool_state( - &pool_info, - future::ready(common_pool_state.clone()).boxed(), - block.into(), - ); - - pool_state.await.unwrap() - }; - - assert_eq!( - pool_state, - Some(PoolState { - tokens: common_pool_state.tokens, - swap_fee, - amplification_parameter, - }) - ); - } - #[test] fn errors_when_converting_wrong_pool_type() { let pool = PoolData { From b6d9d10b335640f46314ad67a0529f6fe6d702a2 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Fri, 17 Nov 2023 17:23:06 +0300 Subject: [PATCH 07/20] Share liquidity fetch requests for identical keys --- crates/shared/src/recent_block_cache.rs | 41 ++++++++++++++++--------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/crates/shared/src/recent_block_cache.rs b/crates/shared/src/recent_block_cache.rs index 8090c0d6b7..3fadb72629 100644 --- a/crates/shared/src/recent_block_cache.rs +++ b/crates/shared/src/recent_block_cache.rs @@ -25,29 +25,31 @@ //! could simplify this module if it was only used by by the former. use { - anyhow::Result, + crate::request_sharing::BoxRequestSharing, + anyhow::{Context, Result}, cached::{Cached, SizedCache}, ethcontract::BlockNumber, ethrpc::current_block::CurrentBlockStream, + futures::FutureExt, prometheus::IntCounterVec, std::{ cmp, collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, hash::Hash, num::{NonZeroU64, NonZeroUsize}, - sync::Mutex, + sync::{Arc, Mutex}, time::Duration, }, }; /// A trait used to define `RecentBlockCache` updating behaviour. #[async_trait::async_trait] -pub trait CacheFetching { +pub trait CacheFetching: Send + Sync + 'static { async fn fetch_values(&self, keys: HashSet, block: Block) -> Result>; } /// A trait used for `RecentBlockCache` keys. -pub trait CacheKey: Clone + Eq + Hash + Ord { +pub trait CacheKey: Clone + Eq + Hash + Ord + Send + Sync + 'static { /// Returns the smallest possible value for this type's `std::cmp::Ord` /// implementation. fn first_ord() -> Self; @@ -87,12 +89,13 @@ where { mutexed: Mutex>, number_of_blocks_to_cache: NonZeroU64, - fetcher: F, + fetcher: Arc, block_stream: CurrentBlockStream, maximum_retries: u32, delay_between_retries: Duration, metrics: &'static Metrics, metrics_label: &'static str, + requests: BoxRequestSharing<(HashSet, Block), Option>>, } #[derive(Clone, Copy, Debug)] @@ -158,12 +161,13 @@ where config.maximum_recent_block_age, )), number_of_blocks_to_cache: config.number_of_blocks_to_cache, - fetcher, + fetcher: Arc::new(fetcher), block_stream, maximum_retries: config.max_retries, delay_between_retries: config.delay_between_retries, metrics: Metrics::instance(observe::metrics::get_storage_registry()).unwrap(), metrics_label, + requests: BoxRequestSharing::labelled("liquidity_fetching".into()), }) } @@ -198,15 +202,24 @@ where // hasn't seen the block yet. As a workaround we repeat the request up to N // times while sleeping in between. async fn fetch_inner(&self, keys: HashSet, block: Block) -> Result> { - let fetch = || self.fetcher.fetch_values(keys.clone(), block); - for _ in 0..self.maximum_retries { - match fetch().await { - Ok(values) => return Ok(values), - Err(err) => tracing::warn!("retrying fetch because error: {:?}", err), + let retries = self.maximum_retries; + let delay = self.delay_between_retries; + let fetcher = self.fetcher.clone(); + let fut = self.requests.shared_or_else((keys, block), |entry| { + let (keys, block) = entry.clone(); + async move { + for _ in 0..=retries { + match fetcher.fetch_values(keys.clone(), block).await { + Ok(values) => return Some(values), + Err(err) => tracing::warn!("retrying fetch because error: {:?}", err), + } + tokio::time::sleep(delay).await; + } + None } - tokio::time::sleep(self.delay_between_retries).await; - } - fetch().await + .boxed() + }); + fut.await.context("could not fetch liquidity") } pub async fn fetch(&self, keys: impl IntoIterator, block: Block) -> Result> { From 094500034011b278e653fc484fbf7d977363b47c Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Fri, 17 Nov 2023 17:39:33 +0300 Subject: [PATCH 08/20] Introduce request sharing in the liquidity cache level --- crates/shared/src/recent_block_cache.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/shared/src/recent_block_cache.rs b/crates/shared/src/recent_block_cache.rs index 3fadb72629..aa8ecd9c99 100644 --- a/crates/shared/src/recent_block_cache.rs +++ b/crates/shared/src/recent_block_cache.rs @@ -205,7 +205,9 @@ where let retries = self.maximum_retries; let delay = self.delay_between_retries; let fetcher = self.fetcher.clone(); + let mut created = false; let fut = self.requests.shared_or_else((keys, block), |entry| { + created = true; let (keys, block) = entry.clone(); async move { for _ in 0..=retries { @@ -219,6 +221,9 @@ where } .boxed() }); + if !created { + tracing::error!("shared existing fetch task"); + } fut.await.context("could not fetch liquidity") } From 1adccd58c0e68e61e2164e9c48dad459d409a416 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Fri, 17 Nov 2023 20:42:31 +0300 Subject: [PATCH 09/20] Share individual requests instead of batches --- crates/driver/src/boundary/liquidity/mod.rs | 2 +- crates/shared/src/recent_block_cache.rs | 87 ++++++++++++++------- 2 files changed, 60 insertions(+), 29 deletions(-) diff --git a/crates/driver/src/boundary/liquidity/mod.rs b/crates/driver/src/boundary/liquidity/mod.rs index 27c2ff4e7d..b119382f34 100644 --- a/crates/driver/src/boundary/liquidity/mod.rs +++ b/crates/driver/src/boundary/liquidity/mod.rs @@ -38,7 +38,7 @@ const BLOCK_POLL_INTERVAL: Duration = Duration::from_secs(1); fn cache_config() -> CacheConfig { CacheConfig { number_of_blocks_to_cache: NonZeroU64::new(10).unwrap(), - number_of_entries_to_auto_update: NonZeroUsize::new(200).unwrap(), + number_of_entries_to_auto_update: NonZeroUsize::new(1000).unwrap(), maximum_recent_block_age: 4, max_retries: 5, delay_between_retries: Duration::from_secs(1), diff --git a/crates/shared/src/recent_block_cache.rs b/crates/shared/src/recent_block_cache.rs index aa8ecd9c99..21a8ca9967 100644 --- a/crates/shared/src/recent_block_cache.rs +++ b/crates/shared/src/recent_block_cache.rs @@ -34,7 +34,7 @@ use { prometheus::IntCounterVec, std::{ cmp, - collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, + collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet}, hash::Hash, num::{NonZeroU64, NonZeroUsize}, sync::{Arc, Mutex}, @@ -95,7 +95,7 @@ where delay_between_retries: Duration, metrics: &'static Metrics, metrics_label: &'static str, - requests: BoxRequestSharing<(HashSet, Block), Option>>, + requests: BoxRequestSharing<(K, Block), Option>>, } #[derive(Clone, Copy, Debug)] @@ -184,34 +184,57 @@ where .keys_of_recently_used_entries() .collect::>(); tracing::debug!("automatically updating {} entries", keys.len()); - let entries = self - .fetch_inner(keys.clone(), Block::Number(new_block)) + let (found_values, _) = self + .fetch_inner_many(keys.clone(), Block::Number(new_block)) .await?; - { - let mut mutexed = self.mutexed.lock().unwrap(); - mutexed.insert(new_block, keys.into_iter(), entries); - let oldest_to_keep = new_block.saturating_sub(self.number_of_blocks_to_cache.get() - 1); - mutexed.remove_cached_blocks_older_than(oldest_to_keep); - mutexed.last_update_block = new_block; - } + + let mut mutexed = self.mutexed.lock().unwrap(); + mutexed.insert(new_block, keys.into_iter(), found_values); + let oldest_to_keep = new_block.saturating_sub(self.number_of_blocks_to_cache.get() - 1); + mutexed.remove_cached_blocks_older_than(oldest_to_keep); + mutexed.last_update_block = new_block; + Ok(()) } + async fn fetch_inner_many(&self, keys: HashSet, block: Block) -> Result<(Vec, Vec)> { + let fetched = + futures::future::join_all(keys.iter().map(|key| self.fetch_inner(key.clone(), block))) + .await; + let found_keys: Vec<_> = fetched + .iter() + .zip(keys.iter()) + .filter_map(|(results, key)| { + results + .as_ref() + .is_ok_and(|res| !res.is_empty()) + .then_some(key.clone()) + }) + .collect(); + let fetched: Vec<_> = fetched + .into_iter() + .filter_map(|res| res.ok()) + .flatten() + .collect(); + Ok((fetched, found_keys)) + } + // Sometimes nodes requests error when we try to get state from what we think is // the current block when the node has been load balanced out to one that // hasn't seen the block yet. As a workaround we repeat the request up to N // times while sleeping in between. - async fn fetch_inner(&self, keys: HashSet, block: Block) -> Result> { + async fn fetch_inner(&self, key: K, block: Block) -> Result> { let retries = self.maximum_retries; let delay = self.delay_between_retries; let fetcher = self.fetcher.clone(); let mut created = false; - let fut = self.requests.shared_or_else((keys, block), |entry| { + let fut = self.requests.shared_or_else((key, block), |entry| { created = true; - let (keys, block) = entry.clone(); + let (key, block) = entry.clone(); async move { for _ in 0..=retries { - match fetcher.fetch_values(keys.clone(), block).await { + let keys = [key.clone()].into(); + match fetcher.fetch_values(keys, block).await { Ok(values) => return Some(values), Err(err) => tracing::warn!("retrying fetch because error: {:?}", err), } @@ -221,9 +244,6 @@ where } .boxed() }); - if !created { - tracing::error!("shared existing fetch task"); - } fut.await.context("could not fetch liquidity") } @@ -235,7 +255,9 @@ where let mut cache_hit_count = 0usize; let mut cache_hits = Vec::new(); - let mut cache_misses = HashSet::new(); + // Use BTreeSet to ensure deterministic iteration order later which enables + // request sharing. + let mut cache_misses = BTreeSet::new(); let last_update_block; { let mut mutexed = self.mutexed.lock().unwrap(); @@ -267,15 +289,21 @@ where } let cache_miss_block = block.unwrap_or(last_update_block); - let uncached_values = self - .fetch_inner(cache_misses.clone(), Block::Number(cache_miss_block)) - .await?; - - cache_hits.extend_from_slice(&uncached_values); + let cache_misses: Vec<_> = cache_misses.into_iter().collect(); + // Splits fetches into chunks because we can get over 1400 requests when the + // cache is empty which tend to time out if we don't chunk them. + for chunk in cache_misses.chunks(200) { + let keys = chunk.iter().cloned().collect(); + let (fetched, found_keys) = self + .fetch_inner_many(keys, Block::Number(cache_miss_block)) + .await?; + cache_hits.extend_from_slice(&fetched); - { let mut mutexed = self.mutexed.lock().unwrap(); - mutexed.insert(cache_miss_block, cache_misses.into_iter(), uncached_values); + mutexed.insert(cache_miss_block, chunk.iter().cloned(), fetched); + for key in found_keys { + mutexed.recently_used.cache_set(key, ()); + } } Ok(cache_hits) @@ -317,7 +345,6 @@ where } fn get(&mut self, key: K, block: Option) -> Option<&[V]> { - self.recently_used.cache_set(key.clone(), ()); let block = block.or_else(|| { self.cached_most_recently_at_block .get(&key) @@ -326,7 +353,11 @@ where self.last_update_block.saturating_sub(block) <= self.maximum_recent_block_age }) })?; - self.entries.get(&(block, key)).map(Vec::as_slice) + let result = self.entries.get(&(block, key.clone())).map(Vec::as_slice); + if result.is_some_and(|values| !values.is_empty()) { + self.recently_used.cache_set(key, ()); + } + result } fn insert( From a4530f54e9983fb1484aa565362556be04862509 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Sat, 18 Nov 2023 01:59:16 +0300 Subject: [PATCH 10/20] Move request sharing GC to background task --- crates/shared/src/recent_block_cache.rs | 2 +- crates/shared/src/request_sharing.rs | 52 +++++++++++++--------- crates/shared/src/trade_finding/oneinch.rs | 2 +- 3 files changed, 32 insertions(+), 24 deletions(-) diff --git a/crates/shared/src/recent_block_cache.rs b/crates/shared/src/recent_block_cache.rs index 21a8ca9967..8035139fd1 100644 --- a/crates/shared/src/recent_block_cache.rs +++ b/crates/shared/src/recent_block_cache.rs @@ -133,7 +133,7 @@ struct Metrics { impl RecentBlockCache where K: CacheKey, - V: Clone, + V: Clone + Send + Sync + 'static, F: CacheFetching, { /// number_of_blocks_to_cache: Previous blocks stay cached until the block diff --git a/crates/shared/src/request_sharing.rs b/crates/shared/src/request_sharing.rs index 2ba6bed58c..9d56d7bbd2 100644 --- a/crates/shared/src/request_sharing.rs +++ b/crates/shared/src/request_sharing.rs @@ -4,7 +4,12 @@ use { FutureExt, }, prometheus::IntCounterVec, - std::{future::Future, sync::Mutex}, + std::{ + collections::HashMap, + future::Future, + hash::Hash, + sync::{Arc, Mutex}, + }, }; // The design of this module is intentionally simple. Every time a shared future @@ -19,7 +24,7 @@ use { /// Share an expensive to compute response with multiple requests that occur /// while one of them is already in flight. pub struct RequestSharing { - in_flight: Mutex)>>, + in_flight: Arc>>>, request_label: String, } @@ -30,13 +35,31 @@ pub type BoxRequestSharing = /// A boxed shared future. pub type BoxShared = Shared>; -impl RequestSharing { +type Cache = Arc>>>; + +impl RequestSharing +where + Fut::Output: Send + Sync, +{ pub fn labelled(request_label: String) -> Self { + let cache: Cache = Default::default(); + Self::spawn_gc(cache.clone()); Self { - in_flight: Default::default(), + in_flight: cache, request_label, } } + + fn spawn_gc(cache: Cache) { + let rt = tokio::runtime::Handle::current(); + tokio::task::spawn_blocking(move || { + { + let mut cache = cache.lock().unwrap(); + cache.retain(|_request, weak| weak.upgrade().is_some()); + } + rt.block_on(tokio::time::sleep(tokio::time::Duration::from_millis(500))) + }); + } } /// Returns a shallow copy (without any pending requests) @@ -51,7 +74,7 @@ impl Clone for RequestSharing { impl RequestSharing where - Request: Eq, + Request: Eq + Hash, Fut: Future, Fut::Output: Clone, { @@ -81,22 +104,7 @@ where let mut in_flight = self.in_flight.lock().unwrap(); // collect garbage and find copy of existing request - let mut existing = None; - in_flight.retain(|(request_, weak)| match weak.upgrade() { - // NOTE: Technically it's possible under very specific circumstances that the - // `active_request` is sitting in the cache for a long time without making progress. - // If somebody else picks it up and polls it to completion a timeout error will most - // likely be the result. See https://github.com/gnosis/gp-v2-services/pull/1677#discussion_r813673692 - // for more details. - Some(shared) if shared.peek().is_none() => { - if *request_ == request { - debug_assert!(existing.is_none()); - existing = Some(shared); - } - true - } - _ => false, - }); + let existing = in_flight.get(&request).and_then(WeakShared::upgrade); if let Some(existing) = existing { Metrics::get() @@ -114,7 +122,7 @@ where let shared = future(&request).shared(); // unwrap because downgrade only returns None if the Shared has already // completed which cannot be the case because we haven't polled it yet. - in_flight.push((request, shared.downgrade().unwrap())); + in_flight.insert(request, shared.downgrade().unwrap()); shared } } diff --git a/crates/shared/src/trade_finding/oneinch.rs b/crates/shared/src/trade_finding/oneinch.rs index 62c9c71d05..82994f975d 100644 --- a/crates/shared/src/trade_finding/oneinch.rs +++ b/crates/shared/src/trade_finding/oneinch.rs @@ -35,7 +35,7 @@ struct Inner { settlement_contract: H160, } -#[derive(Clone, Eq, PartialEq)] +#[derive(Clone, Eq, PartialEq, Hash)] struct InternalQuery { data: Query, allowed_protocols: Option>, From b2eb99df118688a902a649d22b2d41cff2dd35cd Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Sat, 18 Nov 2023 03:12:59 +0300 Subject: [PATCH 11/20] Cache initialized ERC20 and uniswap contracts --- .../src/sources/uniswap_v2/pool_fetching.rs | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/crates/shared/src/sources/uniswap_v2/pool_fetching.rs b/crates/shared/src/sources/uniswap_v2/pool_fetching.rs index 754971b72d..44da49a12d 100644 --- a/crates/shared/src/sources/uniswap_v2/pool_fetching.rs +++ b/crates/shared/src/sources/uniswap_v2/pool_fetching.rs @@ -15,7 +15,10 @@ use { }, model::TokenPair, num::rational::Ratio, - std::{collections::HashSet, sync::RwLock}, + std::{ + collections::{HashMap, HashSet}, + sync::{Arc, RwLock}, + }, }; const POOL_SWAP_GAS_COST: usize = 60_000; @@ -250,16 +253,37 @@ pub struct DefaultPoolReader { pub web3: Web3, } +lazy_static::lazy_static! { + static ref AMMS: RwLock>> = RwLock::new(Default::default()); + static ref TOKENS: RwLock>> = RwLock::new(Default::default()); +} + +macro_rules! get_or_init { + ($contract:ident, $cache:expr, $address:expr, $web3:expr) => {{ + let contract = $cache.read().unwrap().get($address).cloned(); + match contract { + Some(contract) => contract, + None => { + let mut cache = $cache.write().unwrap(); + let entry = cache + .entry($address.clone()) + .or_insert_with(|| Arc::new($contract::at($web3, $address.clone()))); + Arc::clone(entry) + } + } + }}; +} + impl PoolReading for DefaultPoolReader { fn read_state(&self, pair: TokenPair, block: BlockId) -> BoxFuture<'_, Result>> { let pair_address = self.pair_provider.pair_address(&pair); - let pair_contract = IUniswapLikePair::at(&self.web3, pair_address); - - // Fetch ERC20 token balances of the pools to sanity check with reserves - let token0 = ERC20::at(&self.web3, pair.get().0); - let token1 = ERC20::at(&self.web3, pair.get().1); + let pair_contract = get_or_init!(IUniswapLikePair, AMMS, &pair_address, &self.web3); let fetch_reserves = pair_contract.get_reserves().block(block).call(); + + // Fetch ERC20 token balances of the pools to sanity check with reserves + let token0 = get_or_init!(ERC20, TOKENS, &pair.get().0, &self.web3); + let token1 = get_or_init!(ERC20, TOKENS, &pair.get().1, &self.web3); let fetch_token0_balance = token0.balance_of(pair_address).block(block).call(); let fetch_token1_balance = token1.balance_of(pair_address).block(block).call(); From edcb0bacb826e761e4ae7bc5d2bef1ce30921141 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Sat, 18 Nov 2023 03:35:21 +0300 Subject: [PATCH 12/20] Cache balancer pool contract instances --- crates/shared/src/sources/balancer_v2/pools.rs | 17 +++++++++++++++++ .../src/sources/balancer_v2/pools/common.rs | 18 +++++++++++++----- .../balancer_v2/pools/composable_stable.rs | 18 ++++++++++++++---- .../pools/liquidity_bootstrapping.rs | 18 ++++++++++++++---- .../src/sources/balancer_v2/pools/stable.rs | 17 ++++++++++++++--- .../src/sources/balancer_v2/pools/weighted.rs | 16 ++++++++++++++-- 6 files changed, 86 insertions(+), 18 deletions(-) diff --git a/crates/shared/src/sources/balancer_v2/pools.rs b/crates/shared/src/sources/balancer_v2/pools.rs index 4d7eb26f15..4af368271e 100644 --- a/crates/shared/src/sources/balancer_v2/pools.rs +++ b/crates/shared/src/sources/balancer_v2/pools.rs @@ -129,3 +129,20 @@ pub trait PoolIndexing: Clone + Send + Sync + 'static { /// Gets the common pool data. fn common(&self) -> &common::PoolInfo; } + +#[macro_export] +macro_rules! get_or_init { + ($contract:ident, $cache:expr, $address:expr, $web3:expr) => {{ + let contract = $cache.read().unwrap().get($address).cloned(); + match contract { + Some(contract) => contract, + None => { + let mut cache = $cache.write().unwrap(); + let entry = cache + .entry($address.clone()) + .or_insert_with(|| Arc::new($contract::at($web3, $address.clone()))); + Arc::clone(entry) + } + } + }}; +} diff --git a/crates/shared/src/sources/balancer_v2/pools/common.rs b/crates/shared/src/sources/balancer_v2/pools/common.rs index bbe3b88279..d78d5c9338 100644 --- a/crates/shared/src/sources/balancer_v2/pools/common.rs +++ b/crates/shared/src/sources/balancer_v2/pools/common.rs @@ -13,7 +13,11 @@ use { contracts::{BalancerV2BasePool, BalancerV2Vault}, ethcontract::{BlockId, Bytes, H160, H256, U256}, futures::{future::BoxFuture, FutureExt as _, TryFutureExt}, - std::{collections::BTreeMap, future::Future, sync::Arc}, + std::{ + collections::{BTreeMap, HashMap}, + future::Future, + sync::{Arc, RwLock}, + }, tokio::sync::oneshot, }; @@ -45,6 +49,10 @@ pub struct PoolInfoFetcher { token_infos: Arc, } +lazy_static::lazy_static! { + static ref POOLS: RwLock>> = RwLock::new(Default::default()); +} + impl PoolInfoFetcher { pub fn new( vault: BalancerV2Vault, @@ -59,9 +67,9 @@ impl PoolInfoFetcher { } /// Returns a Balancer base pool contract instance at the specified address. - fn base_pool_at(&self, pool_address: H160) -> BalancerV2BasePool { + fn base_pool_at(&self, pool_address: &H160) -> Arc { let web3 = self.vault.raw_instance().web3(); - BalancerV2BasePool::at(&web3, pool_address) + crate::get_or_init!(BalancerV2BasePool, POOLS, pool_address, &web3) } /// Retrieves the scaling exponents for the specified tokens. @@ -85,7 +93,7 @@ impl PoolInfoFetcher { pool_address: H160, block_created: u64, ) -> Result { - let pool = self.base_pool_at(pool_address); + let pool = self.base_pool_at(&pool_address); let pool_id = H256(pool.methods().get_pool_id().call().await?.0); let (tokens, _, _) = self @@ -110,7 +118,7 @@ impl PoolInfoFetcher { pool: &PoolInfo, block: BlockId, ) -> BoxFuture<'static, Result> { - let pool_contract = self.base_pool_at(pool.address); + let pool_contract = self.base_pool_at(&pool.address); let fetch_paused = pool_contract .get_paused_state() .block(block) diff --git a/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs b/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs index 927ed0c8a9..0f1f8e5fc4 100644 --- a/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs +++ b/crates/shared/src/sources/balancer_v2/pools/composable_stable.rs @@ -8,8 +8,12 @@ use { }, anyhow::Result, contracts::{BalancerV2ComposableStablePool, BalancerV2ComposableStablePoolFactory}, - ethcontract::BlockId, + ethcontract::{BlockId, H160}, futures::{future::BoxFuture, FutureExt as _}, + std::{ + collections::HashMap, + sync::{Arc, RwLock}, + }, }; pub use super::stable::{AmplificationParameter, PoolState}; @@ -31,6 +35,10 @@ impl PoolIndexing for PoolInfo { } } +lazy_static::lazy_static! { + static ref POOLS: RwLock>> = RwLock::new(Default::default()); +} + #[async_trait::async_trait] impl FactoryIndexing for BalancerV2ComposableStablePoolFactory { type PoolInfo = PoolInfo; @@ -46,9 +54,11 @@ impl FactoryIndexing for BalancerV2ComposableStablePoolFactory { common_pool_state: BoxFuture<'static, common::PoolState>, block: BlockId, ) -> BoxFuture<'static, Result>> { - let pool_contract = BalancerV2ComposableStablePool::at( - &self.raw_instance().web3(), - pool_info.common.address, + let pool_contract = crate::get_or_init!( + BalancerV2ComposableStablePool, + POOLS, + &pool_info.common.address, + &self.raw_instance().web3() ); let fetch_common = common_pool_state.map(Result::Ok); diff --git a/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs b/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs index 065f436d57..fa13b92e33 100644 --- a/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs +++ b/crates/shared/src/sources/balancer_v2/pools/liquidity_bootstrapping.rs @@ -11,8 +11,12 @@ use { BalancerV2LiquidityBootstrappingPool, BalancerV2LiquidityBootstrappingPoolFactory, }, - ethcontract::BlockId, + ethcontract::{BlockId, H160}, futures::{future::BoxFuture, FutureExt as _}, + std::{ + collections::HashMap, + sync::{Arc, RwLock}, + }, }; pub use super::weighted::{PoolState, TokenState, Version}; @@ -38,6 +42,10 @@ impl PoolIndexing for PoolInfo { } } +lazy_static::lazy_static! { + static ref POOLS: RwLock>> = RwLock::new(Default::default()); +} + #[async_trait::async_trait] impl FactoryIndexing for BalancerV2LiquidityBootstrappingPoolFactory { type PoolInfo = PoolInfo; @@ -53,9 +61,11 @@ impl FactoryIndexing for BalancerV2LiquidityBootstrappingPoolFactory { common_pool_state: BoxFuture<'static, common::PoolState>, block: BlockId, ) -> BoxFuture<'static, Result>> { - let pool_contract = BalancerV2LiquidityBootstrappingPool::at( - &self.raw_instance().web3(), - pool_info.common.address, + let pool_contract = crate::get_or_init!( + BalancerV2LiquidityBootstrappingPool, + POOLS, + &pool_info.common.address, + &self.raw_instance().web3() ); let fetch_common = common_pool_state.map(Result::Ok); diff --git a/crates/shared/src/sources/balancer_v2/pools/stable.rs b/crates/shared/src/sources/balancer_v2/pools/stable.rs index 4e38fd3c24..88d28887a7 100644 --- a/crates/shared/src/sources/balancer_v2/pools/stable.rs +++ b/crates/shared/src/sources/balancer_v2/pools/stable.rs @@ -14,7 +14,10 @@ use { ethcontract::{BlockId, H160, U256}, futures::{future::BoxFuture, FutureExt as _}, num::BigRational, - std::collections::BTreeMap, + std::{ + collections::{BTreeMap, HashMap}, + sync::{Arc, RwLock}, + }, }; #[derive(Clone, Debug, Default, Eq, PartialEq)] @@ -77,6 +80,10 @@ impl AmplificationParameter { } } +lazy_static::lazy_static! { + static ref POOLS: RwLock>> = RwLock::new(Default::default()); +} + #[async_trait::async_trait] impl FactoryIndexing for BalancerV2StablePoolFactoryV2 { type PoolInfo = PoolInfo; @@ -92,8 +99,12 @@ impl FactoryIndexing for BalancerV2StablePoolFactoryV2 { common_pool_state: BoxFuture<'static, common::PoolState>, block: BlockId, ) -> BoxFuture<'static, Result>> { - let pool_contract = - BalancerV2StablePool::at(&self.raw_instance().web3(), pool_info.common.address); + let pool_contract = crate::get_or_init!( + BalancerV2StablePool, + POOLS, + &pool_info.common.address, + &self.raw_instance().web3() + ); let fetch_common = common_pool_state.map(Result::Ok); let fetch_amplification_parameter = pool_contract diff --git a/crates/shared/src/sources/balancer_v2/pools/weighted.rs b/crates/shared/src/sources/balancer_v2/pools/weighted.rs index 1cc821f65f..0fb007b0ed 100644 --- a/crates/shared/src/sources/balancer_v2/pools/weighted.rs +++ b/crates/shared/src/sources/balancer_v2/pools/weighted.rs @@ -14,7 +14,10 @@ use { }, ethcontract::{BlockId, H160}, futures::{future::BoxFuture, FutureExt as _}, - std::collections::BTreeMap, + std::{ + collections::{BTreeMap, HashMap}, + sync::{Arc, RwLock}, + }, }; #[derive(Clone, Debug, Default, Eq, PartialEq)] @@ -64,13 +67,22 @@ impl PoolIndexing for PoolInfo { } } +lazy_static::lazy_static! { + static ref POOLS: RwLock>> = RwLock::new(Default::default()); +} + #[async_trait::async_trait] impl FactoryIndexing for BalancerV2WeightedPoolFactory { type PoolInfo = PoolInfo; type PoolState = PoolState; async fn specialize_pool_info(&self, pool: common::PoolInfo) -> Result { - let pool_contract = BalancerV2WeightedPool::at(&self.raw_instance().web3(), pool.address); + let pool_contract = crate::get_or_init!( + BalancerV2WeightedPool, + POOLS, + &pool.address, + &self.raw_instance().web3() + ); let weights = pool_contract .methods() .get_normalized_weights() From 98267db115abdf20d05ae4e5fd97ed681cdc47d6 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Sat, 18 Nov 2023 12:54:35 +0300 Subject: [PATCH 13/20] Only instantiate ERC20 contracts once while fetching liquidity --- crates/driver/src/domain/competition/auction.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/driver/src/domain/competition/auction.rs b/crates/driver/src/domain/competition/auction.rs index ce9fc32dd0..645d242530 100644 --- a/crates/driver/src/domain/competition/auction.rs +++ b/crates/driver/src/domain/competition/auction.rs @@ -281,6 +281,7 @@ impl AuctionProcessor { /// Fetches the tradable balance for every order owner. async fn fetch_balances(ethereum: &infra::Ethereum, orders: &[order::Order]) -> Balances { + let mut tokens: HashMap<_, _> = Default::default(); // Collect trader/token/source/interaction tuples for fetching available // balances. Note that we are pessimistic here, if a trader is selling // the same token with the same source in two different orders using a @@ -295,6 +296,7 @@ impl AuctionProcessor { .map(|((trader, token, source), mut orders)| { let first = orders.next().expect("group contains at least 1 order"); let mut others = orders; + tokens.entry(token).or_insert_with(|| ethereum.erc20(token)); if others.all(|order| order.pre_interactions == first.pre_interactions) { (trader, token, source, &first.pre_interactions[..]) } else { @@ -306,16 +308,18 @@ impl AuctionProcessor { join_all( traders .into_iter() - .map(|(trader, token, source, interactions)| async move { - let balance = ethereum - .erc20(token) - .tradable_balance(trader.into(), source, interactions) - .await; + .map(|(trader, token, source, interactions)| { + let token_contract = tokens.get(&token); + let token_contract = token_contract.expect("all tokens where created earlier"); + let fetch_balance = token_contract.tradable_balance(trader.into(), source, interactions); + + async move { + let balance = fetch_balance.await; ( (trader, token, source), balance.map(order::SellAmount::from).ok(), ) - }), + }}), ) .await .into_iter() From 9a6db4d33912a5792b750f5f7e58daf0ba081261 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 22 Nov 2023 17:44:38 +0000 Subject: [PATCH 14/20] DEX solvers rate limiter --- crates/solvers/src/boundary/mod.rs | 1 + crates/solvers/src/boundary/rate_limiter.rs | 60 ++++++++++ crates/solvers/src/domain/solver/dex/mod.rs | 110 +++++++++++++------ crates/solvers/src/infra/cli.rs | 21 ++++ crates/solvers/src/infra/dex/balancer/mod.rs | 11 +- crates/solvers/src/infra/dex/mod.rs | 5 + crates/solvers/src/infra/dex/oneinch/mod.rs | 9 +- crates/solvers/src/infra/dex/paraswap/mod.rs | 9 +- crates/solvers/src/infra/dex/zeroex/mod.rs | 3 + crates/solvers/src/run.rs | 33 ++++-- 10 files changed, 217 insertions(+), 45 deletions(-) create mode 100644 crates/solvers/src/boundary/rate_limiter.rs diff --git a/crates/solvers/src/boundary/mod.rs b/crates/solvers/src/boundary/mod.rs index 11315b5591..637ef43a1e 100644 --- a/crates/solvers/src/boundary/mod.rs +++ b/crates/solvers/src/boundary/mod.rs @@ -5,5 +5,6 @@ pub mod baseline; pub mod legacy; pub mod liquidity; pub mod naive; +pub mod rate_limiter; pub type Result = anyhow::Result; diff --git a/crates/solvers/src/boundary/rate_limiter.rs b/crates/solvers/src/boundary/rate_limiter.rs new file mode 100644 index 0000000000..afb0ab56d5 --- /dev/null +++ b/crates/solvers/src/boundary/rate_limiter.rs @@ -0,0 +1,60 @@ +use { + anyhow::Result, + shared::rate_limiter::{ + RateLimiter as SharedRateLimiter, + RateLimitingStrategy as SharedRateLimitingStrategy, + }, + std::{future::Future, str::FromStr}, + thiserror::Error, +}; + +pub struct RateLimiter { + inner: SharedRateLimiter, +} + +#[derive(Debug, Clone)] +pub struct RateLimitingStrategy { + inner: SharedRateLimitingStrategy, +} + +impl Default for RateLimitingStrategy { + fn default() -> Self { + Self { + inner: SharedRateLimitingStrategy::default(), + } + } +} + +impl FromStr for RateLimitingStrategy { + type Err = anyhow::Error; + + fn from_str(config: &str) -> Result { + SharedRateLimitingStrategy::from_str(config).map(|strategy| Self { inner: strategy }) + } +} + +#[derive(Error, Debug, Clone, Default)] +pub enum RateLimiterError { + #[default] + #[error("rate limited")] + RateLimited, +} + +impl RateLimiter { + pub fn new(strategy: RateLimitingStrategy, name: String) -> Self { + Self { + inner: SharedRateLimiter::from_strategy(strategy.inner, name), + } + } + + pub async fn execute( + &self, + task: impl Future, + requires_back_off: impl Fn(&T) -> bool, + ) -> Result { + self.inner + .execute(task, requires_back_off) + .await + .map_err(|_| RateLimiterError::RateLimited) + } +} diff --git a/crates/solvers/src/domain/solver/dex/mod.rs b/crates/solvers/src/domain/solver/dex/mod.rs index 65ceb69f69..f87712e6b6 100644 --- a/crates/solvers/src/domain/solver/dex/mod.rs +++ b/crates/solvers/src/domain/solver/dex/mod.rs @@ -3,12 +3,19 @@ use { crate::{ - domain, - domain::{auction, dex::slippage, order, solution, solver::dex::fills::Fills}, + boundary::rate_limiter::RateLimiter, + domain::{ + self, + auction, + dex::{self, slippage}, + order::{self, Order}, + solution, + solver::dex::fills::Fills, + }, infra, }, futures::{future, stream, FutureExt, StreamExt}, - std::num::NonZeroUsize, + std::{future::Future, num::NonZeroUsize, sync::Arc}, tracing::Instrument, }; @@ -33,10 +40,17 @@ pub struct Dex { /// Parameters used to calculate the revert risk of a solution. risk: domain::Risk, + + /// Handles 429 Too Many Requests error with a retry mechanism + rate_limiter: Arc, } impl Dex { - pub fn new(dex: infra::dex::Dex, config: infra::config::dex::Config) -> Self { + pub fn new( + dex: infra::dex::Dex, + config: infra::config::dex::Config, + rate_limiter: RateLimiter, + ) -> Self { Self { dex, simulator: infra::dex::Simulator::new( @@ -48,6 +62,7 @@ impl Dex { concurrent_requests: config.concurrent_requests, fills: Fills::new(config.smallest_partial_fill), risk: config.risk, + rate_limiter: Arc::new(rate_limiter), } } @@ -78,48 +93,75 @@ impl Dex { .enumerate() .map(|(i, order)| { let span = tracing::info_span!("solve", order = %order.get().uid); - self.solve_order(order, &auction.tokens, auction.gas_price) - .map(move |solution| solution.map(|s| s.with_id(solution::Id(i as u64)))) - .instrument(span) + self.solve_order( + order, + &auction.tokens, + auction.gas_price, + self.rate_limiter.clone(), + ) + .map(move |solution| solution.map(|s| s.with_id(solution::Id(i as u64)))) + .instrument(span) }) .buffer_unordered(self.concurrent_requests.get()) .filter_map(future::ready) } + async fn try_solve( + &self, + order: &Order, + dex_order: &dex::Order, + tokens: &auction::Tokens, + gas_price: auction::GasPrice, + ) -> Result { + let slippage = self.slippage.relative(&dex_order.amount(), tokens); + self.dex + .swap(dex_order, &slippage, tokens, gas_price) + .await + .map_err(|err| { + match &err { + err @ infra::dex::Error::NotFound => { + if order.partially_fillable { + // Only adjust the amount to try next if we are sure the API worked + // correctly yet still wasn't able to provide a + // swap. + self.fills.reduce_next_try(order.uid); + } else { + tracing::debug!(?err, "skipping order"); + } + } + err @ infra::dex::Error::OrderNotSupported => { + tracing::debug!(?err, "skipping order") + } + infra::dex::Error::Other(err) => tracing::warn!(?err, "failed to get swap"), + err @ infra::dex::Error::RateLimited => { + tracing::debug!(?err, "encountered rate limit, retrying") + } + } + err + }) + } + async fn solve_order( &self, order: order::UserOrder<'_>, tokens: &auction::Tokens, gas_price: auction::GasPrice, + rate_limiter: Arc, ) -> Option { let order = order.get(); - let swap = { - let order = self.fills.dex_order(order, tokens)?; - let slippage = self.slippage.relative(&order.amount(), tokens); - self.dex.swap(&order, &slippage, tokens, gas_price).await - }; - - let swap = match swap { - Ok(swap) => swap, - Err(err @ infra::dex::Error::NotFound) => { - if order.partially_fillable { - // Only adjust the amount to try next if we are sure the API worked correctly - // yet still wasn't able to provide a swap. - self.fills.reduce_next_try(order.uid); - } else { - tracing::debug!(?err, "skipping order"); - } - return None; - } - Err(err @ infra::dex::Error::OrderNotSupported) => { - tracing::debug!(?err, "skipping order"); - return None; - } - Err(infra::dex::Error::Other(err)) => { - tracing::warn!(?err, "failed to get swap"); - return None; - } - }; + let dex_order = self.fills.dex_order(order, tokens)?; + let swap = rate_limiter + .execute( + self.try_solve(order, &dex_order, tokens, gas_price), + |result| matches!(result, Err(infra::dex::Error::RateLimited)), + ) + .await + .map_err(|_| { + tracing::debug!("rate limit retries exceeded, unable to complete operation"); + infra::dex::Error::RateLimited + }) + .and_then(|result| result) + .ok()?; let uid = order.uid; let sell = tokens.reference_price(&order.sell.token); diff --git a/crates/solvers/src/infra/cli.rs b/crates/solvers/src/infra/cli.rs index 396b6c50aa..6447e6f737 100644 --- a/crates/solvers/src/infra/cli.rs +++ b/crates/solvers/src/infra/cli.rs @@ -1,6 +1,7 @@ //! CLI arguments for the `solvers` binary. use { + crate::boundary::rate_limiter::RateLimitingStrategy, clap::{Parser, Subcommand}, std::{net::SocketAddr, path::PathBuf}, }; @@ -23,6 +24,12 @@ pub struct Args { #[command(subcommand)] pub command: Command, + + /// Configures the back off strategy for single order solvers. Requests + /// issued while back off is active get dropped entirely. Expects + /// "= 1.0>,,". + #[clap(long, env)] + pub single_order_solver_rate_limiter: Option, } /// The solver engine to run. The config field is a path to the solver @@ -66,3 +73,17 @@ pub enum Command { config: PathBuf, }, } + +impl Command { + pub fn to_lowercase(&self) -> String { + match self { + Command::Baseline { .. } => "baseline".to_string(), + Command::Naive { .. } => "naive".to_string(), + Command::Legacy { .. } => "legacy".to_string(), + Command::Balancer { .. } => "balancer".to_string(), + Command::ZeroEx { .. } => "zeroex".to_string(), + Command::OneInch { .. } => "oneinch".to_string(), + Command::ParaSwap { .. } => "paraswap".to_string(), + } + } +} diff --git a/crates/solvers/src/infra/dex/balancer/mod.rs b/crates/solvers/src/infra/dex/balancer/mod.rs index 92962269f0..806495da1b 100644 --- a/crates/solvers/src/infra/dex/balancer/mod.rs +++ b/crates/solvers/src/infra/dex/balancer/mod.rs @@ -163,12 +163,21 @@ impl Sor { pub enum Error { #[error("no valid swap interaction could be found")] NotFound, + #[error("rate limited")] + RateLimited, #[error(transparent)] Http(util::http::Error), } impl From> for Error { fn from(err: util::http::RoundtripError) -> Self { - Self::Http(err.into()) + match err { + util::http::RoundtripError::Http(util::http::Error::Status(status_code, _)) + if status_code.as_u16() == 429 => + { + Self::RateLimited + } + other_err => Self::Http(other_err.into()), + } } } diff --git a/crates/solvers/src/infra/dex/mod.rs b/crates/solvers/src/infra/dex/mod.rs index 32dacaad4e..774a9405a1 100644 --- a/crates/solvers/src/infra/dex/mod.rs +++ b/crates/solvers/src/infra/dex/mod.rs @@ -46,6 +46,8 @@ pub enum Error { OrderNotSupported, #[error("no valid swap interaction could be found")] NotFound, + #[error("rate limit exceeded")] + RateLimited, #[error(transparent)] Other(Box), } @@ -64,6 +66,7 @@ impl From for Error { match err { oneinch::Error::OrderNotSupported => Self::OrderNotSupported, oneinch::Error::NotFound => Self::NotFound, + oneinch::Error::RateLimited => Self::RateLimited, _ => Self::Other(Box::new(err)), } } @@ -73,6 +76,7 @@ impl From for Error { fn from(err: zeroex::Error) -> Self { match err { zeroex::Error::NotFound => Self::NotFound, + zeroex::Error::RateLimited => Self::RateLimited, _ => Self::Other(Box::new(err)), } } @@ -82,6 +86,7 @@ impl From for Error { fn from(err: paraswap::Error) -> Self { match err { paraswap::Error::NotFound => Self::NotFound, + paraswap::Error::RateLimited => Self::RateLimited, _ => Self::Other(Box::new(err)), } } diff --git a/crates/solvers/src/infra/dex/oneinch/mod.rs b/crates/solvers/src/infra/dex/oneinch/mod.rs index 041ac4b38f..c9af82652b 100644 --- a/crates/solvers/src/infra/dex/oneinch/mod.rs +++ b/crates/solvers/src/infra/dex/oneinch/mod.rs @@ -165,6 +165,8 @@ pub enum Error { OrderNotSupported, #[error("no valid swap could be found")] NotFound, + #[error("rate limit exceeded")] + RateLimited, #[error("api error {code}: {description}")] Api { code: i32, description: String }, #[error(transparent)] @@ -174,7 +176,12 @@ pub enum Error { impl From> for Error { fn from(err: util::http::RoundtripError) -> Self { match err { - util::http::RoundtripError::Http(err) => Self::Http(err), + util::http::RoundtripError::Http(http_err) => match http_err { + util::http::Error::Status(status_code, _) if status_code.as_u16() == 429 => { + Self::RateLimited + } + other_err => Self::Http(other_err), + }, util::http::RoundtripError::Api(err) => { // Unfortunately, AFAIK these codes aren't documented anywhere. These // based on empirical observations of what the API has returned in the diff --git a/crates/solvers/src/infra/dex/paraswap/mod.rs b/crates/solvers/src/infra/dex/paraswap/mod.rs index 621dafd984..d4e87310e6 100644 --- a/crates/solvers/src/infra/dex/paraswap/mod.rs +++ b/crates/solvers/src/infra/dex/paraswap/mod.rs @@ -113,6 +113,8 @@ pub enum Error { NotFound, #[error("decimals are missing for the swapped tokens")] MissingDecimals, + #[error("rate limited")] + RateLimited, #[error("api error {0}")] Api(String), #[error(transparent)] @@ -122,7 +124,12 @@ pub enum Error { impl From> for Error { fn from(err: util::http::RoundtripError) -> Self { match err { - util::http::RoundtripError::Http(err) => Self::Http(err), + util::http::RoundtripError::Http(http_err) => match http_err { + util::http::Error::Status(status_code, _) if status_code.as_u16() == 429 => { + Self::RateLimited + } + other_err => Self::Http(other_err), + }, util::http::RoundtripError::Api(err) => match err.error.as_str() { "ESTIMATED_LOSS_GREATER_THAN_MAX_IMPACT" | "No routes found with enough liquidity" diff --git a/crates/solvers/src/infra/dex/zeroex/mod.rs b/crates/solvers/src/infra/dex/zeroex/mod.rs index ac7929bfe2..9529307f0c 100644 --- a/crates/solvers/src/infra/dex/zeroex/mod.rs +++ b/crates/solvers/src/infra/dex/zeroex/mod.rs @@ -150,6 +150,8 @@ pub enum Error { NotFound, #[error("quote does not specify an approval spender")] MissingSpender, + #[error("rate limit exceeded")] + RateLimited, #[error("api error code {code}: {reason}")] Api { code: i64, reason: String }, #[error(transparent)] @@ -166,6 +168,7 @@ impl From> for Error { // past. match err.code { 100 => Self::NotFound, + 429 => Self::RateLimited, _ => Self::Api { code: err.code, reason: err.reason, diff --git a/crates/solvers/src/run.rs b/crates/solvers/src/run.rs index 9eee0b2ac1..5c7197712c 100644 --- a/crates/solvers/src/run.rs +++ b/crates/solvers/src/run.rs @@ -2,6 +2,7 @@ use tokio::signal::unix::{self, SignalKind}; use { crate::{ + boundary::rate_limiter::RateLimiter, domain::solver::{self, Solver}, infra::{cli, config, dex}, }, @@ -28,47 +29,63 @@ async fn run_with(args: cli::Args, bind: Option>) { observe::tracing::initialize_reentrant(&args.log); tracing::info!("running solver engine with {args:#?}"); - let solver = match args.command { + let solver = match &args.command { cli::Command::Baseline { config } => { - let config = config::baseline::file::load(&config).await; + let config = config::baseline::file::load(config).await; Solver::Baseline(solver::Baseline::new(config)) } cli::Command::Naive { config } => { - let config = config::naive::file::load(&config).await; + let config = config::naive::file::load(config).await; Solver::Naive(solver::Naive::new(config)) } cli::Command::Legacy { config } => { - let config = config::legacy::load(&config).await; + let config = config::legacy::load(config).await; Solver::Legacy(solver::Legacy::new(config)) } cli::Command::ZeroEx { config } => { - let config = config::dex::zeroex::file::load(&config).await; + let config = config::dex::zeroex::file::load(config).await; Solver::Dex(solver::Dex::new( dex::Dex::ZeroEx( dex::zeroex::ZeroEx::new(config.zeroex).expect("invalid 0x configuration"), ), config.base, + RateLimiter::new( + args.single_order_solver_rate_limiter.unwrap_or_default(), + args.command.to_lowercase(), + ), )) } cli::Command::Balancer { config } => { - let config = config::dex::balancer::file::load(&config).await; + let config = config::dex::balancer::file::load(config).await; Solver::Dex(solver::Dex::new( dex::Dex::Balancer(dex::balancer::Sor::new(config.sor)), config.base, + RateLimiter::new( + args.single_order_solver_rate_limiter.unwrap_or_default(), + args.command.to_lowercase(), + ), )) } cli::Command::OneInch { config } => { - let config = config::dex::oneinch::file::load(&config).await; + let config = config::dex::oneinch::file::load(config).await; Solver::Dex(solver::Dex::new( dex::Dex::OneInch(dex::oneinch::OneInch::new(config.oneinch).await.unwrap()), config.base, + RateLimiter::new( + args.single_order_solver_rate_limiter.unwrap_or_default(), + args.command.to_lowercase(), + ), )) } cli::Command::ParaSwap { config } => { - let config = config::dex::paraswap::file::load(&config).await; + let config = config::dex::paraswap::file::load(config).await; Solver::Dex(solver::Dex::new( dex::Dex::ParaSwap(dex::paraswap::ParaSwap::new(config.paraswap)), config.base, + RateLimiter::new( + args.single_order_solver_rate_limiter.unwrap_or_default(), + args.command.to_lowercase(), + ), )) } }; From 4c9c8e9c58dc07874d0ef40b4bbf57c74b0f51cb Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 22 Nov 2023 19:23:18 +0000 Subject: [PATCH 15/20] Retry mechanism --- crates/shared/src/rate_limiter.rs | 2 +- crates/solvers/src/boundary/rate_limiter.rs | 53 +++++++++++++++++++-- crates/solvers/src/domain/solver/dex/mod.rs | 6 +-- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/crates/shared/src/rate_limiter.rs b/crates/shared/src/rate_limiter.rs index c280c475c4..34d5035f6e 100644 --- a/crates/shared/src/rate_limiter.rs +++ b/crates/shared/src/rate_limiter.rs @@ -92,7 +92,7 @@ impl RateLimitingStrategy { } /// Calculates back off based on how often we got rate limited in a row. - fn get_current_back_off(&self) -> Duration { + pub fn get_current_back_off(&self) -> Duration { let factor = self .back_off_growth_factor .powf(self.times_rate_limited as f64); diff --git a/crates/solvers/src/boundary/rate_limiter.rs b/crates/solvers/src/boundary/rate_limiter.rs index afb0ab56d5..a962595369 100644 --- a/crates/solvers/src/boundary/rate_limiter.rs +++ b/crates/solvers/src/boundary/rate_limiter.rs @@ -1,26 +1,31 @@ use { - anyhow::Result, + anyhow::{ensure, Context, Result}, shared::rate_limiter::{ RateLimiter as SharedRateLimiter, RateLimitingStrategy as SharedRateLimitingStrategy, }, - std::{future::Future, str::FromStr}, + std::{future::Future, str::FromStr, time::Duration}, thiserror::Error, }; pub struct RateLimiter { inner: SharedRateLimiter, + max_retries: usize, } #[derive(Debug, Clone)] pub struct RateLimitingStrategy { inner: SharedRateLimitingStrategy, + max_retries: usize, } +const DEFAULT_MAX_RETIRES: usize = 2; + impl Default for RateLimitingStrategy { fn default() -> Self { Self { inner: SharedRateLimitingStrategy::default(), + max_retries: DEFAULT_MAX_RETIRES, } } } @@ -29,7 +34,21 @@ impl FromStr for RateLimitingStrategy { type Err = anyhow::Error; fn from_str(config: &str) -> Result { - SharedRateLimitingStrategy::from_str(config).map(|strategy| Self { inner: strategy }) + let mut parts = config.split(','); + + let shared_config = parts.by_ref().take(3).collect::>().join(","); + let inner = SharedRateLimitingStrategy::from_str(&shared_config)?; + + let default_max_retries_str = DEFAULT_MAX_RETIRES.to_string(); + let max_retries = parts.next().unwrap_or(&default_max_retries_str); + let max_retries = max_retries.parse().context("parsing max_retries")?; + + ensure!( + parts.next().is_none(), + "extraneous rate limiting parameters" + ); + + Ok(Self { inner, max_retries }) } } @@ -44,6 +63,7 @@ impl RateLimiter { pub fn new(strategy: RateLimitingStrategy, name: String) -> Self { Self { inner: SharedRateLimiter::from_strategy(strategy.inner, name), + max_retries: strategy.max_retries, } } @@ -57,4 +77,31 @@ impl RateLimiter { .await .map_err(|_| RateLimiterError::RateLimited) } + + pub async fn execute_with_retries( + &self, + task: F, + requires_back_off: impl Fn(&T) -> bool + Clone, + ) -> Result + where + F: Fn() -> Fut, + Fut: Future, + { + let mut retries = 0; + while retries < self.max_retries { + match self.execute(task(), requires_back_off.clone()).await { + Ok(result) => return Ok(result), + Err(RateLimiterError::RateLimited) => { + let back_off_duration = self.get_back_off_duration(); + tokio::time::sleep(back_off_duration).await; + retries += 1; + } + } + } + Err(RateLimiterError::RateLimited) + } + + fn get_back_off_duration(&self) -> Duration { + self.inner.strategy.lock().unwrap().get_current_back_off() + } } diff --git a/crates/solvers/src/domain/solver/dex/mod.rs b/crates/solvers/src/domain/solver/dex/mod.rs index f87712e6b6..82cb407a25 100644 --- a/crates/solvers/src/domain/solver/dex/mod.rs +++ b/crates/solvers/src/domain/solver/dex/mod.rs @@ -15,7 +15,7 @@ use { infra, }, futures::{future, stream, FutureExt, StreamExt}, - std::{future::Future, num::NonZeroUsize, sync::Arc}, + std::{num::NonZeroUsize, sync::Arc}, tracing::Instrument, }; @@ -151,8 +151,8 @@ impl Dex { let order = order.get(); let dex_order = self.fills.dex_order(order, tokens)?; let swap = rate_limiter - .execute( - self.try_solve(order, &dex_order, tokens, gas_price), + .execute_with_retries( + || self.try_solve(order, &dex_order, tokens, gas_price), |result| matches!(result, Err(infra::dex::Error::RateLimited)), ) .await From cf6e0dcf40da9bb7e6b8e9d3e0ce3406d867efc1 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 22 Nov 2023 20:54:36 +0000 Subject: [PATCH 16/20] Tests --- crates/shared/src/rate_limiter.rs | 6 +- crates/solvers/src/boundary/rate_limiter.rs | 128 ++++++++++++++++++-- 2 files changed, 120 insertions(+), 14 deletions(-) diff --git a/crates/shared/src/rate_limiter.rs b/crates/shared/src/rate_limiter.rs index 34d5035f6e..e3e67200f9 100644 --- a/crates/shared/src/rate_limiter.rs +++ b/crates/shared/src/rate_limiter.rs @@ -33,9 +33,9 @@ pub struct RateLimitingStrategy { drop_requests_until: Instant, /// How many requests got rate limited in a row. times_rate_limited: u64, - back_off_growth_factor: f64, - min_back_off: Duration, - max_back_off: Duration, + pub back_off_growth_factor: f64, + pub min_back_off: Duration, + pub max_back_off: Duration, } impl Default for RateLimitingStrategy { diff --git a/crates/solvers/src/boundary/rate_limiter.rs b/crates/solvers/src/boundary/rate_limiter.rs index a962595369..3b60d15cb1 100644 --- a/crates/solvers/src/boundary/rate_limiter.rs +++ b/crates/solvers/src/boundary/rate_limiter.rs @@ -2,9 +2,10 @@ use { anyhow::{ensure, Context, Result}, shared::rate_limiter::{ RateLimiter as SharedRateLimiter, + RateLimiterError as SharedRateLimiterError, RateLimitingStrategy as SharedRateLimitingStrategy, }, - std::{future::Future, str::FromStr, time::Duration}, + std::{future::Future, ops::Add, str::FromStr, time::Duration}, thiserror::Error, }; @@ -52,7 +53,7 @@ impl FromStr for RateLimitingStrategy { } } -#[derive(Error, Debug, Clone, Default)] +#[derive(Error, Debug, Clone, Default, PartialEq)] pub enum RateLimiterError { #[default] #[error("rate limited")] @@ -75,7 +76,9 @@ impl RateLimiter { self.inner .execute(task, requires_back_off) .await - .map_err(|_| RateLimiterError::RateLimited) + .map_err(|err| match err { + SharedRateLimiterError::RateLimited => RateLimiterError::RateLimited, + }) } pub async fn execute_with_retries( @@ -89,19 +92,122 @@ impl RateLimiter { { let mut retries = 0; while retries < self.max_retries { - match self.execute(task(), requires_back_off.clone()).await { - Ok(result) => return Ok(result), - Err(RateLimiterError::RateLimited) => { - let back_off_duration = self.get_back_off_duration(); - tokio::time::sleep(back_off_duration).await; - retries += 1; - } + let result = self.execute(task(), requires_back_off.clone()).await; + let should_retry = match &result { + Ok(result) => requires_back_off.clone()(result), + Err(RateLimiterError::RateLimited) => true, + }; + + if should_retry { + let back_off_duration = self.get_back_off_duration(); + tokio::time::sleep(back_off_duration).await; + retries += 1; + } else { + return result; } } Err(RateLimiterError::RateLimited) } fn get_back_off_duration(&self) -> Duration { - self.inner.strategy.lock().unwrap().get_current_back_off() + self.inner + .strategy + .lock() + .unwrap() + .get_current_back_off() + // add 100 millis to make sure the RateLimiter updated it's counter + .add(Duration::from_millis(100)) + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + std::sync::atomic::{AtomicUsize, Ordering}, + }; + + #[tokio::test] + async fn test_execute_with_retries() { + let strategy = RateLimitingStrategy::default(); + let rate_limiter = RateLimiter::new(strategy, "test".to_string()); + let call_count = AtomicUsize::new(0); + + let task = || { + let count = call_count.fetch_add(1, Ordering::SeqCst); + async move { + if count < 1 { + Err(RateLimiterError::RateLimited) + } else { + Ok(42) + } + } + }; + + let result = rate_limiter + .execute_with_retries(task, |res| { + let back_off_required = matches!(res, Err(RateLimiterError::RateLimited)); + back_off_required + }) + .await + .and_then(|result: Result| result); + assert_eq!(result, Ok(42)); + assert_eq!(call_count.load(Ordering::SeqCst), 2); + } + + #[tokio::test] + async fn test_execute_with_retries_exceeds() { + let strategy = RateLimitingStrategy::default(); + let rate_limiter = RateLimiter::new(strategy, "test".to_string()); + let call_count = AtomicUsize::new(0); + + let task = || { + call_count.fetch_add(1, Ordering::SeqCst); + async move { Err(RateLimiterError::RateLimited) } + }; + + let result = rate_limiter + .execute_with_retries(task, |res| { + let back_off_required = matches!(res, Err(RateLimiterError::RateLimited)); + back_off_required + }) + .await + .and_then(|result: Result| result); + assert_eq!(result, Err(RateLimiterError::RateLimited)); + assert_eq!(call_count.load(Ordering::SeqCst), 2); + } +} + +#[cfg(test)] +mod config_tests { + use super::*; + + #[test] + fn parse_rate_limiting_strategy() { + let config_str = "1.5,10,30,3"; + let strategy: RateLimitingStrategy = config_str.parse().unwrap(); + assert_eq!(strategy.inner.back_off_growth_factor, 1.5); + assert_eq!(strategy.inner.min_back_off, Duration::from_secs(10)); + assert_eq!(strategy.inner.max_back_off, Duration::from_secs(30)); + assert_eq!(strategy.max_retries, 3); + } + + #[test] + fn parse_rate_limiting_strategy_with_default_retries() { + let config_str = "1.5,10,30"; + let strategy: RateLimitingStrategy = config_str.parse().unwrap(); + assert_eq!(strategy.max_retries, DEFAULT_MAX_RETIRES); + } + + #[test] + fn parse_invalid_rate_limiting_strategy() { + let config_str = "invalid"; + assert!(config_str.parse::().is_err()); + } + + #[test] + fn parse_too_many_args_rate_limiting_strategy() { + let config_str = "1.5,10,30,3,10"; + assert!(config_str.parse::().is_err()); } } From 432d0e968518c177a7b90e0d132399385b495786 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 22 Nov 2023 22:04:49 +0000 Subject: [PATCH 17/20] Use default retries = 1 --- crates/solvers/src/boundary/rate_limiter.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/solvers/src/boundary/rate_limiter.rs b/crates/solvers/src/boundary/rate_limiter.rs index 3b60d15cb1..2fae7cf4f7 100644 --- a/crates/solvers/src/boundary/rate_limiter.rs +++ b/crates/solvers/src/boundary/rate_limiter.rs @@ -20,7 +20,7 @@ pub struct RateLimitingStrategy { max_retries: usize, } -const DEFAULT_MAX_RETIRES: usize = 2; +const DEFAULT_MAX_RETIRES: usize = 1; impl Default for RateLimitingStrategy { fn default() -> Self { @@ -129,7 +129,10 @@ mod tests { #[tokio::test] async fn test_execute_with_retries() { - let strategy = RateLimitingStrategy::default(); + let strategy = RateLimitingStrategy { + inner: SharedRateLimitingStrategy::default(), + max_retries: 2, + }; let rate_limiter = RateLimiter::new(strategy, "test".to_string()); let call_count = AtomicUsize::new(0); @@ -157,7 +160,10 @@ mod tests { #[tokio::test] async fn test_execute_with_retries_exceeds() { - let strategy = RateLimitingStrategy::default(); + let strategy = RateLimitingStrategy { + inner: SharedRateLimitingStrategy::default(), + max_retries: 2, + }; let rate_limiter = RateLimiter::new(strategy, "test".to_string()); let call_count = AtomicUsize::new(0); From f688b18cf7b295d608fa88517a2a2225cd4c4359 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 23 Nov 2023 21:57:38 +0000 Subject: [PATCH 18/20] Review fixes --- Cargo.lock | 18 ++ crates/shared/src/rate_limiter.rs | 90 +++++++++- crates/solvers/Cargo.toml | 2 + crates/solvers/src/boundary/rate_limiter.rs | 185 ++------------------ crates/solvers/src/domain/solver/dex/mod.rs | 31 ++-- crates/solvers/src/infra/cli.rs | 21 --- crates/solvers/src/infra/config/dex/file.rs | 33 +++- crates/solvers/src/infra/config/dex/mod.rs | 8 +- crates/solvers/src/run.rs | 44 ++--- 9 files changed, 185 insertions(+), 247 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a9de100fd..1031586604 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2370,6 +2370,22 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.27" @@ -4154,6 +4170,8 @@ dependencies = [ "futures", "glob", "hex", + "humantime", + "humantime-serde", "hyper", "itertools 0.11.0", "model", diff --git a/crates/shared/src/rate_limiter.rs b/crates/shared/src/rate_limiter.rs index e3e67200f9..206d603673 100644 --- a/crates/shared/src/rate_limiter.rs +++ b/crates/shared/src/rate_limiter.rs @@ -33,9 +33,9 @@ pub struct RateLimitingStrategy { drop_requests_until: Instant, /// How many requests got rate limited in a row. times_rate_limited: u64, - pub back_off_growth_factor: f64, - pub min_back_off: Duration, - pub max_back_off: Duration, + back_off_growth_factor: f64, + min_back_off: Duration, + max_back_off: Duration, } impl Default for RateLimitingStrategy { @@ -174,7 +174,7 @@ impl RateLimiter { } } -#[derive(Error, Debug, Clone, Default)] +#[derive(Error, Debug, Clone, Default, PartialEq)] pub enum RateLimiterError { #[default] #[error("rate limited")] @@ -221,6 +221,30 @@ impl RateLimiter { Ok(result) } + + pub async fn execute_with_back_off( + &self, + task: impl Future, + requires_back_off: impl Fn(&T) -> bool, + ) -> Result { + if let Some(back_off_duration) = self.get_back_off_duration_if_limited() { + tokio::time::sleep(back_off_duration).await; + } + + self.execute(task, requires_back_off).await + } + + fn get_back_off_duration_if_limited(&self) -> Option { + let strategy = self.strategy.lock().unwrap(); + let now = Instant::now(); + + if strategy.drop_requests_until > now { + let back_off_duration = strategy.drop_requests_until - now; + Some(back_off_duration) + } else { + None + } + } } /// Shared module with common back-off checks. @@ -236,7 +260,7 @@ pub mod back_off { #[cfg(test)] mod tests { - use {super::*, futures::FutureExt, tokio::time::sleep}; + use {super::*, futures::FutureExt, std::ops::Add, tokio::time::sleep}; #[test] fn current_back_off_does_not_panic() { @@ -317,4 +341,60 @@ mod tests { rate_limiter.strategy().get_current_back_off() ); } + + #[tokio::test] + async fn test_execute_with_no_back_off() { + let timeout = Duration::from_secs(30); + let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap(); + let original_drop_until = strategy.drop_requests_until; + let rate_limiter = RateLimiter::from_strategy(strategy, "test_no_back_off".to_string()); + + let result = rate_limiter + .execute_with_back_off(async { 1 }, |_| false) + .await + .unwrap(); + + assert_eq!(result, 1); + { + let current_strategy = rate_limiter.strategy.lock().unwrap(); + assert!(current_strategy.drop_requests_until < original_drop_until.add(timeout)); + } + + let result = rate_limiter.execute(async { 1 }, |_| false).await.unwrap(); + assert_eq!(result, 1); + } + + #[tokio::test] + async fn test_execute_with_back_off() { + let timeout = Duration::from_secs(3); + let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap(); + let original_drop_until = strategy.drop_requests_until; + let rate_limiter = RateLimiter::from_strategy(strategy, "test_back_off".to_string()); + + let result = rate_limiter + .execute_with_back_off(async { 1 }, |_| true) + .await + .unwrap(); + + assert_eq!(result, 1); + let drop_until = { + let current_strategy = rate_limiter.strategy.lock().unwrap(); + let drop_until = current_strategy.drop_requests_until; + assert!(drop_until >= original_drop_until.add(timeout)); + drop_until + }; + + let result = rate_limiter.execute(async { 1 }, |_| false).await; + assert_eq!(result, Err(RateLimiterError::RateLimited)); + { + let current_strategy = rate_limiter.strategy.lock().unwrap(); + assert_eq!(current_strategy.drop_requests_until, drop_until); + } + + let result = rate_limiter + .execute_with_back_off(async { 1 }, |_| false) + .await + .unwrap(); + assert_eq!(result, 1); + } } diff --git a/crates/solvers/Cargo.toml b/crates/solvers/Cargo.toml index c34b389d50..cc253af3b1 100644 --- a/crates/solvers/Cargo.toml +++ b/crates/solvers/Cargo.toml @@ -21,6 +21,8 @@ ethereum-types = "0.14" ethrpc = { path = "../ethrpc" } futures = "0.3" hex = "0.4" +humantime = "2.1.0" +humantime-serde = "1.1.1" hyper = "0.14" itertools = "0.11" num = "0.4" diff --git a/crates/solvers/src/boundary/rate_limiter.rs b/crates/solvers/src/boundary/rate_limiter.rs index 2fae7cf4f7..e5b6c5367c 100644 --- a/crates/solvers/src/boundary/rate_limiter.rs +++ b/crates/solvers/src/boundary/rate_limiter.rs @@ -1,55 +1,31 @@ use { - anyhow::{ensure, Context, Result}, + anyhow::Result, shared::rate_limiter::{ RateLimiter as SharedRateLimiter, RateLimiterError as SharedRateLimiterError, RateLimitingStrategy as SharedRateLimitingStrategy, }, - std::{future::Future, ops::Add, str::FromStr, time::Duration}, + std::{future::Future, time::Duration}, thiserror::Error, }; pub struct RateLimiter { inner: SharedRateLimiter, - max_retries: usize, } #[derive(Debug, Clone)] pub struct RateLimitingStrategy { inner: SharedRateLimitingStrategy, - max_retries: usize, } -const DEFAULT_MAX_RETIRES: usize = 1; - -impl Default for RateLimitingStrategy { - fn default() -> Self { - Self { - inner: SharedRateLimitingStrategy::default(), - max_retries: DEFAULT_MAX_RETIRES, - } - } -} - -impl FromStr for RateLimitingStrategy { - type Err = anyhow::Error; - - fn from_str(config: &str) -> Result { - let mut parts = config.split(','); - - let shared_config = parts.by_ref().take(3).collect::>().join(","); - let inner = SharedRateLimitingStrategy::from_str(&shared_config)?; - - let default_max_retries_str = DEFAULT_MAX_RETIRES.to_string(); - let max_retries = parts.next().unwrap_or(&default_max_retries_str); - let max_retries = max_retries.parse().context("parsing max_retries")?; - - ensure!( - parts.next().is_none(), - "extraneous rate limiting parameters" - ); - - Ok(Self { inner, max_retries }) +impl RateLimitingStrategy { + pub fn try_new( + back_off_growth_factor: f64, + min_back_off: Duration, + max_back_off: Duration, + ) -> Result { + SharedRateLimitingStrategy::try_new(back_off_growth_factor, min_back_off, max_back_off) + .map(|shared| Self { inner: shared }) } } @@ -64,156 +40,19 @@ impl RateLimiter { pub fn new(strategy: RateLimitingStrategy, name: String) -> Self { Self { inner: SharedRateLimiter::from_strategy(strategy.inner, name), - max_retries: strategy.max_retries, } } - pub async fn execute( + pub async fn execute_with_back_off( &self, task: impl Future, requires_back_off: impl Fn(&T) -> bool, ) -> Result { self.inner - .execute(task, requires_back_off) + .execute_with_back_off(task, requires_back_off) .await .map_err(|err| match err { SharedRateLimiterError::RateLimited => RateLimiterError::RateLimited, }) } - - pub async fn execute_with_retries( - &self, - task: F, - requires_back_off: impl Fn(&T) -> bool + Clone, - ) -> Result - where - F: Fn() -> Fut, - Fut: Future, - { - let mut retries = 0; - while retries < self.max_retries { - let result = self.execute(task(), requires_back_off.clone()).await; - let should_retry = match &result { - Ok(result) => requires_back_off.clone()(result), - Err(RateLimiterError::RateLimited) => true, - }; - - if should_retry { - let back_off_duration = self.get_back_off_duration(); - tokio::time::sleep(back_off_duration).await; - retries += 1; - } else { - return result; - } - } - Err(RateLimiterError::RateLimited) - } - - fn get_back_off_duration(&self) -> Duration { - self.inner - .strategy - .lock() - .unwrap() - .get_current_back_off() - // add 100 millis to make sure the RateLimiter updated it's counter - .add(Duration::from_millis(100)) - } -} - -#[cfg(test)] -mod tests { - use { - super::*, - std::sync::atomic::{AtomicUsize, Ordering}, - }; - - #[tokio::test] - async fn test_execute_with_retries() { - let strategy = RateLimitingStrategy { - inner: SharedRateLimitingStrategy::default(), - max_retries: 2, - }; - let rate_limiter = RateLimiter::new(strategy, "test".to_string()); - let call_count = AtomicUsize::new(0); - - let task = || { - let count = call_count.fetch_add(1, Ordering::SeqCst); - async move { - if count < 1 { - Err(RateLimiterError::RateLimited) - } else { - Ok(42) - } - } - }; - - let result = rate_limiter - .execute_with_retries(task, |res| { - let back_off_required = matches!(res, Err(RateLimiterError::RateLimited)); - back_off_required - }) - .await - .and_then(|result: Result| result); - assert_eq!(result, Ok(42)); - assert_eq!(call_count.load(Ordering::SeqCst), 2); - } - - #[tokio::test] - async fn test_execute_with_retries_exceeds() { - let strategy = RateLimitingStrategy { - inner: SharedRateLimitingStrategy::default(), - max_retries: 2, - }; - let rate_limiter = RateLimiter::new(strategy, "test".to_string()); - let call_count = AtomicUsize::new(0); - - let task = || { - call_count.fetch_add(1, Ordering::SeqCst); - async move { Err(RateLimiterError::RateLimited) } - }; - - let result = rate_limiter - .execute_with_retries(task, |res| { - let back_off_required = matches!(res, Err(RateLimiterError::RateLimited)); - back_off_required - }) - .await - .and_then(|result: Result| result); - assert_eq!(result, Err(RateLimiterError::RateLimited)); - assert_eq!(call_count.load(Ordering::SeqCst), 2); - } -} - -#[cfg(test)] -mod config_tests { - use super::*; - - #[test] - fn parse_rate_limiting_strategy() { - let config_str = "1.5,10,30,3"; - let strategy: RateLimitingStrategy = config_str.parse().unwrap(); - assert_eq!(strategy.inner.back_off_growth_factor, 1.5); - assert_eq!(strategy.inner.min_back_off, Duration::from_secs(10)); - assert_eq!(strategy.inner.max_back_off, Duration::from_secs(30)); - assert_eq!(strategy.max_retries, 3); - } - - #[test] - fn parse_rate_limiting_strategy_with_default_retries() { - let config_str = "1.5,10,30"; - let strategy: RateLimitingStrategy = config_str.parse().unwrap(); - assert_eq!(strategy.max_retries, DEFAULT_MAX_RETIRES); - } - - #[test] - fn parse_invalid_rate_limiting_strategy() { - let config_str = "invalid"; - assert!(config_str.parse::().is_err()); - } - - #[test] - fn parse_too_many_args_rate_limiting_strategy() { - let config_str = "1.5,10,30,3,10"; - assert!(config_str.parse::().is_err()); - } } diff --git a/crates/solvers/src/domain/solver/dex/mod.rs b/crates/solvers/src/domain/solver/dex/mod.rs index 82cb407a25..5d09efa1d7 100644 --- a/crates/solvers/src/domain/solver/dex/mod.rs +++ b/crates/solvers/src/domain/solver/dex/mod.rs @@ -3,7 +3,7 @@ use { crate::{ - boundary::rate_limiter::RateLimiter, + boundary::rate_limiter::{RateLimiter, RateLimiterError}, domain::{ self, auction, @@ -93,14 +93,9 @@ impl Dex { .enumerate() .map(|(i, order)| { let span = tracing::info_span!("solve", order = %order.get().uid); - self.solve_order( - order, - &auction.tokens, - auction.gas_price, - self.rate_limiter.clone(), - ) - .map(move |solution| solution.map(|s| s.with_id(solution::Id(i as u64)))) - .instrument(span) + self.solve_order(order, &auction.tokens, auction.gas_price) + .map(move |solution| solution.map(|s| s.with_id(solution::Id(i as u64)))) + .instrument(span) }) .buffer_unordered(self.concurrent_requests.get()) .filter_map(future::ready) @@ -132,10 +127,10 @@ impl Dex { err @ infra::dex::Error::OrderNotSupported => { tracing::debug!(?err, "skipping order") } - infra::dex::Error::Other(err) => tracing::warn!(?err, "failed to get swap"), err @ infra::dex::Error::RateLimited => { - tracing::debug!(?err, "encountered rate limit, retrying") + tracing::debug!(?err, "encountered rate limit") } + infra::dex::Error::Other(err) => tracing::warn!(?err, "failed to get swap"), } err }) @@ -146,19 +141,19 @@ impl Dex { order: order::UserOrder<'_>, tokens: &auction::Tokens, gas_price: auction::GasPrice, - rate_limiter: Arc, ) -> Option { let order = order.get(); let dex_order = self.fills.dex_order(order, tokens)?; - let swap = rate_limiter - .execute_with_retries( - || self.try_solve(order, &dex_order, tokens, gas_price), + let swap = self + .rate_limiter + .clone() + .execute_with_back_off( + self.try_solve(order, &dex_order, tokens, gas_price), |result| matches!(result, Err(infra::dex::Error::RateLimited)), ) .await - .map_err(|_| { - tracing::debug!("rate limit retries exceeded, unable to complete operation"); - infra::dex::Error::RateLimited + .map_err(|err| match err { + RateLimiterError::RateLimited => infra::dex::Error::RateLimited, }) .and_then(|result| result) .ok()?; diff --git a/crates/solvers/src/infra/cli.rs b/crates/solvers/src/infra/cli.rs index 6447e6f737..396b6c50aa 100644 --- a/crates/solvers/src/infra/cli.rs +++ b/crates/solvers/src/infra/cli.rs @@ -1,7 +1,6 @@ //! CLI arguments for the `solvers` binary. use { - crate::boundary::rate_limiter::RateLimitingStrategy, clap::{Parser, Subcommand}, std::{net::SocketAddr, path::PathBuf}, }; @@ -24,12 +23,6 @@ pub struct Args { #[command(subcommand)] pub command: Command, - - /// Configures the back off strategy for single order solvers. Requests - /// issued while back off is active get dropped entirely. Expects - /// "= 1.0>,,". - #[clap(long, env)] - pub single_order_solver_rate_limiter: Option, } /// The solver engine to run. The config field is a path to the solver @@ -73,17 +66,3 @@ pub enum Command { config: PathBuf, }, } - -impl Command { - pub fn to_lowercase(&self) -> String { - match self { - Command::Baseline { .. } => "baseline".to_string(), - Command::Naive { .. } => "naive".to_string(), - Command::Legacy { .. } => "legacy".to_string(), - Command::Balancer { .. } => "balancer".to_string(), - Command::ZeroEx { .. } => "zeroex".to_string(), - Command::OneInch { .. } => "oneinch".to_string(), - Command::ParaSwap { .. } => "paraswap".to_string(), - } - } -} diff --git a/crates/solvers/src/infra/config/dex/file.rs b/crates/solvers/src/infra/config/dex/file.rs index 6b101e96b8..c9bc338a88 100644 --- a/crates/solvers/src/infra/config/dex/file.rs +++ b/crates/solvers/src/infra/config/dex/file.rs @@ -2,6 +2,7 @@ use { crate::{ + boundary::rate_limiter::RateLimitingStrategy, domain::{dex::slippage, eth, Risk}, infra::{blockchain, config::unwrap_or_log, contracts}, util::serialize, @@ -9,7 +10,7 @@ use { bigdecimal::BigDecimal, serde::{de::DeserializeOwned, Deserialize}, serde_with::serde_as, - std::{fmt::Debug, num::NonZeroUsize, path::Path}, + std::{fmt::Debug, num::NonZeroUsize, path::Path, time::Duration}, tokio::fs, }; @@ -48,6 +49,18 @@ struct Config { /// (gas_amount_factor, gas_price_factor, nmb_orders_factor, intercept) risk_parameters: (f64, f64, f64, f64), + /// Back-off growth factor for rate limiting. + #[serde(default = "default_back_off_growth_factor")] + back_off_growth_factor: f64, + + /// Minimum back-off time in seconds for rate limiting. + #[serde(with = "humantime_serde", default = "default_min_back_off")] + min_back_off: Duration, + + /// Maximum back-off time in seconds for rate limiting. + #[serde(with = "humantime_serde", default = "default_max_back_off")] + max_back_off: Duration, + /// Settings specific to the wrapped dex API. dex: toml::Value, } @@ -64,6 +77,18 @@ fn default_smallest_partial_fill() -> eth::U256 { eth::U256::exp10(16) // 0.01 ETH } +fn default_back_off_growth_factor() -> f64 { + 1.0 +} + +fn default_min_back_off() -> Duration { + Duration::default() +} + +fn default_max_back_off() -> Duration { + Duration::default() +} + /// Loads the base solver configuration from a TOML file. /// /// # Panics @@ -117,6 +142,12 @@ pub async fn load(path: &Path) -> (super::Config, T) { nmb_orders_factor: config.risk_parameters.2, intercept: config.risk_parameters.3, }, + rate_limiting_strategy: RateLimitingStrategy::try_new( + config.back_off_growth_factor, + config.min_back_off, + config.max_back_off, + ) + .unwrap(), }; (config, dex) } diff --git a/crates/solvers/src/infra/config/dex/mod.rs b/crates/solvers/src/infra/config/dex/mod.rs index ac8cb2354d..400e92a6b9 100644 --- a/crates/solvers/src/infra/config/dex/mod.rs +++ b/crates/solvers/src/infra/config/dex/mod.rs @@ -5,15 +5,20 @@ pub mod paraswap; pub mod zeroex; use { - crate::domain::{dex::slippage, eth, Risk}, + crate::{ + boundary::rate_limiter::RateLimitingStrategy, + domain::{dex::slippage, eth, Risk}, + }, std::num::NonZeroUsize, }; +#[derive(Clone)] pub struct Contracts { pub settlement: eth::ContractAddress, pub authenticator: eth::ContractAddress, } +#[derive(Clone)] pub struct Config { pub node_url: reqwest::Url, pub contracts: Contracts, @@ -21,4 +26,5 @@ pub struct Config { pub concurrent_requests: NonZeroUsize, pub smallest_partial_fill: eth::Ether, pub risk: Risk, + pub rate_limiting_strategy: RateLimitingStrategy, } diff --git a/crates/solvers/src/run.rs b/crates/solvers/src/run.rs index 5c7197712c..f5de7307b1 100644 --- a/crates/solvers/src/run.rs +++ b/crates/solvers/src/run.rs @@ -29,63 +29,51 @@ async fn run_with(args: cli::Args, bind: Option>) { observe::tracing::initialize_reentrant(&args.log); tracing::info!("running solver engine with {args:#?}"); - let solver = match &args.command { + let solver = match args.command { cli::Command::Baseline { config } => { - let config = config::baseline::file::load(config).await; + let config = config::baseline::file::load(&config).await; Solver::Baseline(solver::Baseline::new(config)) } cli::Command::Naive { config } => { - let config = config::naive::file::load(config).await; + let config = config::naive::file::load(&config).await; Solver::Naive(solver::Naive::new(config)) } cli::Command::Legacy { config } => { - let config = config::legacy::load(config).await; + let config = config::legacy::load(&config).await; Solver::Legacy(solver::Legacy::new(config)) } cli::Command::ZeroEx { config } => { - let config = config::dex::zeroex::file::load(config).await; + let config = config::dex::zeroex::file::load(&config).await; Solver::Dex(solver::Dex::new( dex::Dex::ZeroEx( dex::zeroex::ZeroEx::new(config.zeroex).expect("invalid 0x configuration"), ), - config.base, - RateLimiter::new( - args.single_order_solver_rate_limiter.unwrap_or_default(), - args.command.to_lowercase(), - ), + config.base.clone(), + RateLimiter::new(config.base.rate_limiting_strategy, "dex_api".to_string()), )) } cli::Command::Balancer { config } => { - let config = config::dex::balancer::file::load(config).await; + let config = config::dex::balancer::file::load(&config).await; Solver::Dex(solver::Dex::new( dex::Dex::Balancer(dex::balancer::Sor::new(config.sor)), - config.base, - RateLimiter::new( - args.single_order_solver_rate_limiter.unwrap_or_default(), - args.command.to_lowercase(), - ), + config.base.clone(), + RateLimiter::new(config.base.rate_limiting_strategy, "dex_api".to_string()), )) } cli::Command::OneInch { config } => { - let config = config::dex::oneinch::file::load(config).await; + let config = config::dex::oneinch::file::load(&config).await; Solver::Dex(solver::Dex::new( dex::Dex::OneInch(dex::oneinch::OneInch::new(config.oneinch).await.unwrap()), - config.base, - RateLimiter::new( - args.single_order_solver_rate_limiter.unwrap_or_default(), - args.command.to_lowercase(), - ), + config.base.clone(), + RateLimiter::new(config.base.rate_limiting_strategy, "dex_api".to_string()), )) } cli::Command::ParaSwap { config } => { - let config = config::dex::paraswap::file::load(config).await; + let config = config::dex::paraswap::file::load(&config).await; Solver::Dex(solver::Dex::new( dex::Dex::ParaSwap(dex::paraswap::ParaSwap::new(config.paraswap)), - config.base, - RateLimiter::new( - args.single_order_solver_rate_limiter.unwrap_or_default(), - args.command.to_lowercase(), - ), + config.base.clone(), + RateLimiter::new(config.base.rate_limiting_strategy, "dex_api".to_string()), )) } }; From 58764c4853b71b355cf0256863466b06c88d92e7 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 23 Nov 2023 22:06:22 +0000 Subject: [PATCH 19/20] Remove redundant Arc --- crates/solvers/src/domain/solver/dex/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/solvers/src/domain/solver/dex/mod.rs b/crates/solvers/src/domain/solver/dex/mod.rs index 5d09efa1d7..041ef6b765 100644 --- a/crates/solvers/src/domain/solver/dex/mod.rs +++ b/crates/solvers/src/domain/solver/dex/mod.rs @@ -15,7 +15,7 @@ use { infra, }, futures::{future, stream, FutureExt, StreamExt}, - std::{num::NonZeroUsize, sync::Arc}, + std::num::NonZeroUsize, tracing::Instrument, }; @@ -42,7 +42,7 @@ pub struct Dex { risk: domain::Risk, /// Handles 429 Too Many Requests error with a retry mechanism - rate_limiter: Arc, + rate_limiter: RateLimiter, } impl Dex { @@ -62,7 +62,7 @@ impl Dex { concurrent_requests: config.concurrent_requests, fills: Fills::new(config.smallest_partial_fill), risk: config.risk, - rate_limiter: Arc::new(rate_limiter), + rate_limiter, } } @@ -146,7 +146,6 @@ impl Dex { let dex_order = self.fills.dex_order(order, tokens)?; let swap = self .rate_limiter - .clone() .execute_with_back_off( self.try_solve(order, &dex_order, tokens, gas_price), |result| matches!(result, Err(infra::dex::Error::RateLimited)), From 840f59f0c67cc60419b8645b012f149448c02bf8 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 23 Nov 2023 22:10:47 +0000 Subject: [PATCH 20/20] Minor fixes --- crates/shared/src/rate_limiter.rs | 2 +- crates/solvers/src/infra/dex/mod.rs | 2 +- crates/solvers/src/infra/dex/oneinch/mod.rs | 2 +- crates/solvers/src/infra/dex/zeroex/mod.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/shared/src/rate_limiter.rs b/crates/shared/src/rate_limiter.rs index 206d603673..9717f8bad5 100644 --- a/crates/shared/src/rate_limiter.rs +++ b/crates/shared/src/rate_limiter.rs @@ -92,7 +92,7 @@ impl RateLimitingStrategy { } /// Calculates back off based on how often we got rate limited in a row. - pub fn get_current_back_off(&self) -> Duration { + fn get_current_back_off(&self) -> Duration { let factor = self .back_off_growth_factor .powf(self.times_rate_limited as f64); diff --git a/crates/solvers/src/infra/dex/mod.rs b/crates/solvers/src/infra/dex/mod.rs index 774a9405a1..4f2a35bb9e 100644 --- a/crates/solvers/src/infra/dex/mod.rs +++ b/crates/solvers/src/infra/dex/mod.rs @@ -46,7 +46,7 @@ pub enum Error { OrderNotSupported, #[error("no valid swap interaction could be found")] NotFound, - #[error("rate limit exceeded")] + #[error("rate limited")] RateLimited, #[error(transparent)] Other(Box), diff --git a/crates/solvers/src/infra/dex/oneinch/mod.rs b/crates/solvers/src/infra/dex/oneinch/mod.rs index c9af82652b..6ef480a810 100644 --- a/crates/solvers/src/infra/dex/oneinch/mod.rs +++ b/crates/solvers/src/infra/dex/oneinch/mod.rs @@ -165,7 +165,7 @@ pub enum Error { OrderNotSupported, #[error("no valid swap could be found")] NotFound, - #[error("rate limit exceeded")] + #[error("rate limited")] RateLimited, #[error("api error {code}: {description}")] Api { code: i32, description: String }, diff --git a/crates/solvers/src/infra/dex/zeroex/mod.rs b/crates/solvers/src/infra/dex/zeroex/mod.rs index 9529307f0c..43834cc24a 100644 --- a/crates/solvers/src/infra/dex/zeroex/mod.rs +++ b/crates/solvers/src/infra/dex/zeroex/mod.rs @@ -150,7 +150,7 @@ pub enum Error { NotFound, #[error("quote does not specify an approval spender")] MissingSpender, - #[error("rate limit exceeded")] + #[error("rate limited")] RateLimited, #[error("api error code {code}: {reason}")] Api { code: i64, reason: String },