Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Colocation rc #2074

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
76e08ac
Fixed uniswap like implementation
MartinquaXD Nov 16, 2023
7434628
Omit requests for missing balancer pools
MartinquaXD Nov 16, 2023
8a03d12
cargo fmt and fix unit tests
MartinquaXD Nov 17, 2023
6fe5fe1
Merge remote-tracking branch 'origin/main' into omit-requests-for-kno…
MartinquaXD Nov 17, 2023
99f9ffa
Remove manual batching from balancer liquidity fetching
MartinquaXD Nov 17, 2023
4be15ce
Drop manual batching from uni v2 liquidity fetching
MartinquaXD Nov 17, 2023
4ff95cd
Drop unit tests which required delayed execution of batch calls
MartinquaXD Nov 17, 2023
b6d9d10
Share liquidity fetch requests for identical keys
MartinquaXD Nov 17, 2023
0945000
Introduce request sharing in the liquidity cache level
MartinquaXD Nov 17, 2023
1adccd5
Share individual requests instead of batches
MartinquaXD Nov 17, 2023
a4530f5
Move request sharing GC to background task
MartinquaXD Nov 17, 2023
b2eb99d
Cache initialized ERC20 and uniswap contracts
MartinquaXD Nov 18, 2023
edcb0ba
Cache balancer pool contract instances
MartinquaXD Nov 18, 2023
98267db
Only instantiate ERC20 contracts once while fetching liquidity
MartinquaXD Nov 18, 2023
9a6db4d
DEX solvers rate limiter
squadgazzz Nov 22, 2023
4c9c8e9
Retry mechanism
squadgazzz Nov 22, 2023
cf6e0dc
Tests
squadgazzz Nov 22, 2023
432d0e9
Use default retries = 1
squadgazzz Nov 22, 2023
f688b18
Review fixes
squadgazzz Nov 23, 2023
58764c4
Remove redundant Arc
squadgazzz Nov 23, 2023
840f59f
Minor fixes
squadgazzz Nov 23, 2023
33d1a1e
Merge branch 'main' into dex-solver-rate-limit
squadgazzz Nov 24, 2023
ccd425d
Merge branch 'share-liquidity-fetch-requests' into colocation_rc
fleupold Nov 27, 2023
492b1b9
Merge branch 'dex-solver-rate-limit' into colocation_rc
fleupold Nov 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/driver/src/boundary/liquidity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const BLOCK_POLL_INTERVAL: Duration = Duration::from_secs(1);
fn cache_config() -> CacheConfig {
CacheConfig {
number_of_blocks_to_cache: NonZeroU64::new(10).unwrap(),
number_of_entries_to_auto_update: NonZeroUsize::new(200).unwrap(),
number_of_entries_to_auto_update: NonZeroUsize::new(1000).unwrap(),
maximum_recent_block_age: 4,
max_retries: 5,
delay_between_retries: Duration::from_secs(1),
Expand Down
10 changes: 5 additions & 5 deletions crates/driver/src/boundary/liquidity/uniswap/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ where
init_code_digest: config.pool_code.into(),
};

let pool_fetcher = PoolFetcher::new(
reader(web3.clone(), pair_provider),
web3.clone(),
config.missing_pool_cache_time,
);
let pool_fetcher = PoolFetcher {
pool_reader: reader(web3.clone(), pair_provider),
web3: web3.clone(),
non_existent_pools: Default::default(),
};

let pool_cache = Arc::new(PoolCache::new(
boundary::liquidity::cache_config(),
Expand Down
16 changes: 10 additions & 6 deletions crates/driver/src/domain/competition/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ impl AuctionProcessor {

/// Fetches the tradable balance for every order owner.
async fn fetch_balances(ethereum: &infra::Ethereum, orders: &[order::Order]) -> Balances {
let mut tokens: HashMap<_, _> = Default::default();
// Collect trader/token/source/interaction tuples for fetching available
// balances. Note that we are pessimistic here, if a trader is selling
// the same token with the same source in two different orders using a
Expand All @@ -297,6 +298,7 @@ impl AuctionProcessor {
.map(|((trader, token, source), mut orders)| {
let first = orders.next().expect("group contains at least 1 order");
let mut others = orders;
tokens.entry(token).or_insert_with(|| ethereum.erc20(token));
if others.all(|order| order.pre_interactions == first.pre_interactions) {
(trader, token, source, &first.pre_interactions[..])
} else {
Expand All @@ -308,16 +310,18 @@ impl AuctionProcessor {
join_all(
traders
.into_iter()
.map(|(trader, token, source, interactions)| async move {
let balance = ethereum
.erc20(token)
.tradable_balance(trader.into(), source, interactions)
.await;
.map(|(trader, token, source, interactions)| {
let token_contract = tokens.get(&token);
let token_contract = token_contract.expect("all tokens where created earlier");
let fetch_balance = token_contract.tradable_balance(trader.into(), source, interactions);

async move {
let balance = fetch_balance.await;
(
(trader, token, source),
balance.map(order::SellAmount::from).ok(),
)
}),
}}),
)
.await
.into_iter()
Expand Down
84 changes: 82 additions & 2 deletions crates/shared/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl RateLimiter {
}
}

#[derive(Error, Debug, Clone, Default)]
#[derive(Error, Debug, Clone, Default, PartialEq)]
pub enum RateLimiterError {
#[default]
#[error("rate limited")]
Expand Down Expand Up @@ -221,6 +221,30 @@ impl RateLimiter {

Ok(result)
}

pub async fn execute_with_back_off<T>(
&self,
task: impl Future<Output = T>,
requires_back_off: impl Fn(&T) -> bool,
) -> Result<T, RateLimiterError> {
if let Some(back_off_duration) = self.get_back_off_duration_if_limited() {
tokio::time::sleep(back_off_duration).await;
}

self.execute(task, requires_back_off).await
}

fn get_back_off_duration_if_limited(&self) -> Option<Duration> {
let strategy = self.strategy.lock().unwrap();
let now = Instant::now();

if strategy.drop_requests_until > now {
let back_off_duration = strategy.drop_requests_until - now;
Some(back_off_duration)
} else {
None
}
}
}

/// Shared module with common back-off checks.
Expand All @@ -236,7 +260,7 @@ pub mod back_off {

#[cfg(test)]
mod tests {
use {super::*, futures::FutureExt, tokio::time::sleep};
use {super::*, futures::FutureExt, std::ops::Add, tokio::time::sleep};

#[test]
fn current_back_off_does_not_panic() {
Expand Down Expand Up @@ -317,4 +341,60 @@ mod tests {
rate_limiter.strategy().get_current_back_off()
);
}

#[tokio::test]
async fn test_execute_with_no_back_off() {
let timeout = Duration::from_secs(30);
let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap();
let original_drop_until = strategy.drop_requests_until;
let rate_limiter = RateLimiter::from_strategy(strategy, "test_no_back_off".to_string());

let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| false)
.await
.unwrap();

assert_eq!(result, 1);
{
let current_strategy = rate_limiter.strategy.lock().unwrap();
assert!(current_strategy.drop_requests_until < original_drop_until.add(timeout));
}

let result = rate_limiter.execute(async { 1 }, |_| false).await.unwrap();
assert_eq!(result, 1);
}

#[tokio::test]
async fn test_execute_with_back_off() {
let timeout = Duration::from_secs(3);
let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap();
let original_drop_until = strategy.drop_requests_until;
let rate_limiter = RateLimiter::from_strategy(strategy, "test_back_off".to_string());

let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| true)
.await
.unwrap();

assert_eq!(result, 1);
let drop_until = {
let current_strategy = rate_limiter.strategy.lock().unwrap();
let drop_until = current_strategy.drop_requests_until;
assert!(drop_until >= original_drop_until.add(timeout));
drop_until
};

let result = rate_limiter.execute(async { 1 }, |_| false).await;
assert_eq!(result, Err(RateLimiterError::RateLimited));
{
let current_strategy = rate_limiter.strategy.lock().unwrap();
assert_eq!(current_strategy.drop_requests_until, drop_until);
}

let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| false)
.await
.unwrap();
assert_eq!(result, 1);
}
}
Loading
Loading