Skip to content

Commit

Permalink
Remove manual batching (#2064)
Browse files Browse the repository at this point in the history
# Description
By now we have a `Web3` implementation that automatically batches calls
under the hood, meaning that "manually" setting up batch calls are
probably detrimental because those batches will not become part of the
regular bigger batches.

# Changes
Drop manual batching from uni v2 and balancer v2 liquidity fetching.

## How to test
e2e should still pass
  • Loading branch information
MartinquaXD authored Nov 22, 2023
1 parent 47e3707 commit d31e240
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 493 deletions.
16 changes: 2 additions & 14 deletions crates/shared/src/sources/balancer_v2/pool_fetching/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use {
ethcontract::{dyns::DynAllEventsBuilder, errors::MethodError, BlockId, Instance, H256},
ethrpc::{
current_block::{BlockNumberHash, BlockRetrieving},
Web3,
Web3CallBatch,
Web3Transport,
MAX_BATCH_SIZE,
},
futures::future,
hex_literal::hex,
Expand Down Expand Up @@ -61,7 +58,6 @@ pub struct Registry<Factory>
where
Factory: FactoryIndexing,
{
web3: Web3,
fetcher: Arc<dyn PoolInfoFetching<Factory>>,
updater: PoolUpdater<Factory>,
}
Expand All @@ -78,18 +74,13 @@ where
initial_pools: Vec<Factory::PoolInfo>,
start_sync_at_block: Option<BlockNumberHash>,
) -> Self {
let web3 = factory_instance.web3();
let updater = Mutex::new(EventHandler::new(
block_retreiver,
BasePoolFactoryContract(base_pool_factory(factory_instance)),
PoolStorage::new(initial_pools, fetcher.clone()),
start_sync_at_block,
));
Self {
web3,
fetcher,
updater,
}
Self { fetcher, updater }
}
}

Expand All @@ -107,17 +98,14 @@ where
}

async fn pools_by_id(&self, pool_ids: HashSet<H256>, block: Block) -> Result<Vec<Pool>> {
let mut batch = Web3CallBatch::new(self.web3.transport().clone());
let block = BlockId::Number(block.into());

let pool_infos = self.updater.lock().await.store().pools_by_id(&pool_ids);
let pool_futures = pool_infos
.into_iter()
.map(|pool_info| self.fetcher.fetch_pool(&pool_info, &mut batch, block))
.map(|pool_info| self.fetcher.fetch_pool(&pool_info, block))
.collect::<Vec<_>>();

batch.execute_all(MAX_BATCH_SIZE).await;

let pools = future::join_all(pool_futures).await;
collect_pool_results(pools)
}
Expand Down
2 changes: 0 additions & 2 deletions crates/shared/src/sources/balancer_v2/pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub mod weighted;

use {
super::graph_api::PoolData,
crate::ethrpc::Web3CallBatch,
anyhow::Result,
ethcontract::{BlockId, H256},
futures::future::BoxFuture,
Expand Down Expand Up @@ -116,7 +115,6 @@ pub trait FactoryIndexing: Send + Sync + 'static {
// where we can't use other lifetimes here.
// <https://github.com/asomers/mockall/issues/299>
common_pool_state: BoxFuture<'static, common::PoolState>,
batch: &mut Web3CallBatch,
block: BlockId,
) -> BoxFuture<'static, Result<Option<Self::PoolState>>>;
}
Expand Down
81 changes: 30 additions & 51 deletions crates/shared/src/sources/balancer_v2/pools/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use {
super::{FactoryIndexing, Pool, PoolIndexing as _, PoolStatus},
crate::{
ethrpc::Web3CallBatch,
sources::balancer_v2::{
graph_api::{PoolData, PoolType},
swap::fixed_point::Bfp,
Expand All @@ -13,7 +12,7 @@ use {
anyhow::{anyhow, ensure, Context, Result},
contracts::{BalancerV2BasePool, BalancerV2Vault},
ethcontract::{BlockId, Bytes, H160, H256, U256},
futures::{future::BoxFuture, FutureExt as _},
futures::{future::BoxFuture, FutureExt as _, TryFutureExt},
std::{collections::BTreeMap, future::Future, sync::Arc},
tokio::sync::oneshot,
};
Expand All @@ -34,7 +33,6 @@ where
fn fetch_pool(
&self,
pool: &Factory::PoolInfo,
batch: &mut Web3CallBatch,
block: BlockId,
) -> BoxFuture<'static, Result<PoolStatus>>;
}
Expand Down Expand Up @@ -110,34 +108,32 @@ impl<Factory> PoolInfoFetcher<Factory> {
fn fetch_common_pool_state(
&self,
pool: &PoolInfo,
batch: &mut Web3CallBatch,
block: BlockId,
) -> BoxFuture<'static, Result<PoolState>> {
let pool_contract = self.base_pool_at(pool.address);
let paused = pool_contract
let fetch_paused = pool_contract
.get_paused_state()
.block(block)
.batch_call(batch);
let swap_fee = pool_contract
.get_swap_fee_percentage()
.block(block)
.batch_call(batch);
let balances = self
.call()
.map_ok(|result| result.0);
let fetch_swap_fee = pool_contract.get_swap_fee_percentage().block(block).call();
let fetch_balances = self
.vault
.get_pool_tokens(Bytes(pool.id.0))
.block(block)
.batch_call(batch);
.call();

// Because of a `mockall` limitation, we **need** the future returned
// here to be `'static`. This requires us to clone and move `pool` into
// the async closure - otherwise it would only live for as long as
// `pool`, i.e. `'_`.
let pool = pool.clone();
async move {
let (paused, _, _) = paused.await?;
let swap_fee = Bfp::from_wei(swap_fee.await?);
let (paused, swap_fee, balances) =
futures::try_join!(fetch_paused, fetch_swap_fee, fetch_balances)?;
let swap_fee = Bfp::from_wei(swap_fee);

let (token_addresses, balances, _) = balances.await?;
let (token_addresses, balances, _) = balances;
ensure!(pool.tokens == token_addresses, "pool token mismatch");
let tokens = itertools::izip!(&pool.tokens, balances, &pool.scaling_factors)
.map(|(&address, balance, &scaling_factor)| {
Expand Down Expand Up @@ -180,15 +176,14 @@ where
fn fetch_pool(
&self,
pool_info: &Factory::PoolInfo,
batch: &mut Web3CallBatch,
block: BlockId,
) -> BoxFuture<'static, Result<PoolStatus>> {
let pool_id = pool_info.common().id;
let (common_pool_state, common_pool_state_ok) =
share_common_pool_state(self.fetch_common_pool_state(pool_info.common(), batch, block));
share_common_pool_state(self.fetch_common_pool_state(pool_info.common(), block));
let pool_state =
self.factory
.fetch_pool_state(pool_info, common_pool_state_ok.boxed(), batch, block);
.fetch_pool_state(pool_info, common_pool_state_ok.boxed(), block);

async move {
let common_pool_state = common_pool_state.await?;
Expand Down Expand Up @@ -449,13 +444,10 @@ mod tests {
};

let pool_state = {
let mut batch = Web3CallBatch::new(web3.transport().clone());
let block = web3.eth().block_number().await.unwrap();

let pool_state =
pool_info_fetcher.fetch_common_pool_state(&pool_info, &mut batch, block.into());
let pool_state = pool_info_fetcher.fetch_common_pool_state(&pool_info, block.into());

batch.execute_all(100).await;
pool_state.await.unwrap()
};

Expand Down Expand Up @@ -521,13 +513,10 @@ mod tests {
};

let pool_state = {
let mut batch = Web3CallBatch::new(web3.transport().clone());
let block = web3.eth().block_number().await.unwrap();

let pool_state =
pool_info_fetcher.fetch_common_pool_state(&pool_info, &mut batch, block.into());
let pool_state = pool_info_fetcher.fetch_common_pool_state(&pool_info, block.into());

batch.execute_all(100).await;
pool_state.await
};

Expand Down Expand Up @@ -607,12 +596,11 @@ mod tests {
.with(
predicate::eq(pool_info.clone()),
predicate::always(),
predicate::always(),
predicate::eq(BlockId::from(block)),
)
.returning({
let pool_state = pool_state.clone();
move |_, _, _, _| future::ready(Ok(Some(pool_state.clone()))).boxed()
move |_, _, _| future::ready(Ok(Some(pool_state.clone()))).boxed()
});

let pool_info_fetcher = PoolInfoFetcher {
Expand All @@ -621,13 +609,10 @@ mod tests {
token_infos: Arc::new(MockTokenInfoFetching::new()),
};

let pool_status = {
let mut batch = Web3CallBatch::new(web3.transport().clone());
let pool_state = pool_info_fetcher.fetch_pool(&pool_info, &mut batch, block.into());

batch.execute_all(100).await;
pool_state.await.unwrap()
};
let pool_status = pool_info_fetcher
.fetch_pool(&pool_info, block.into())
.await
.unwrap();

assert_eq!(
pool_status,
Expand Down Expand Up @@ -661,9 +646,8 @@ mod tests {
predicate::always(),
predicate::always(),
predicate::always(),
predicate::always(),
)
.returning(|_, _, _, _| {
.returning(|_, _, _| {
future::ready(Ok(Some(weighted::PoolState {
swap_fee: Bfp::zero(),
tokens: Default::default(),
Expand All @@ -689,13 +673,11 @@ mod tests {
};

let pool_status = {
let mut batch = Web3CallBatch::new(web3.transport().clone());
let block = web3.eth().block_number().await.unwrap();

let pool_state = pool_info_fetcher.fetch_pool(&pool_info, &mut batch, block.into());

batch.execute_all(100).await;
pool_state.await.unwrap()
pool_info_fetcher
.fetch_pool(&pool_info, block.into())
.await
.unwrap()
};

assert_eq!(pool_status, PoolStatus::Paused);
Expand Down Expand Up @@ -724,9 +706,8 @@ mod tests {
predicate::always(),
predicate::always(),
predicate::always(),
predicate::always(),
)
.returning(|_, _, _, _| future::ready(Ok(None)).boxed());
.returning(|_, _, _| future::ready(Ok(None)).boxed());

let pool_info_fetcher = PoolInfoFetcher {
vault: BalancerV2Vault::at(&web3, vault.address()),
Expand All @@ -745,13 +726,11 @@ mod tests {
};

let pool_status = {
let mut batch = Web3CallBatch::new(web3.transport().clone());
let block = web3.eth().block_number().await.unwrap();

let pool_state = pool_info_fetcher.fetch_pool(&pool_info, &mut batch, block.into());

batch.execute_all(100).await;
pool_state.await.unwrap()
pool_info_fetcher
.fetch_pool(&pool_info, block.into())
.await
.unwrap()
};

assert_eq!(pool_status, PoolStatus::Disabled);
Expand Down
Loading

0 comments on commit d31e240

Please sign in to comment.