Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEX solvers rate limiter #2071

Merged
merged 16 commits into from
Nov 30, 2023
88 changes: 86 additions & 2 deletions crates/shared/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl RateLimiter {
}
}

#[derive(Error, Debug, Clone, Default)]
#[derive(Error, Debug, Clone, Default, PartialEq)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required somewhere or just for completeness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required anymore for the latest code revision.

pub enum RateLimiterError {
#[default]
#[error("rate limited")]
Expand Down Expand Up @@ -221,6 +221,30 @@ impl RateLimiter {

Ok(result)
}

pub async fn execute_with_back_off<T>(
&self,
task: impl Future<Output = T>,
requires_back_off: impl Fn(&T) -> bool,
) -> Result<T, RateLimiterError> {
if let Some(back_off_duration) = self.get_back_off_duration_if_limited() {
tokio::time::sleep(back_off_duration).await;
}

self.execute(task, requires_back_off).await
}

fn get_back_off_duration_if_limited(&self) -> Option<Duration> {
let strategy = self.strategy.lock().unwrap();
let now = Instant::now();

if strategy.drop_requests_until > now {
let back_off_duration = strategy.drop_requests_until - now;
Some(back_off_duration)
} else {
None
}
}
}

/// Shared module with common back-off checks.
Expand All @@ -236,7 +260,7 @@ pub mod back_off {

#[cfg(test)]
mod tests {
use {super::*, futures::FutureExt, tokio::time::sleep};
use {super::*, futures::FutureExt, std::ops::Add, tokio::time::sleep};

#[test]
fn current_back_off_does_not_panic() {
Expand Down Expand Up @@ -317,4 +341,64 @@ mod tests {
rate_limiter.strategy().get_current_back_off()
);
}

#[tokio::test]
async fn test_execute_with_no_back_off() {
let timeout = Duration::from_secs(30);
let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap();
let original_drop_until = strategy.drop_requests_until;
let rate_limiter = RateLimiter::from_strategy(strategy, "test_no_back_off".to_string());

let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| false)
.now_or_never()
.unwrap()
.unwrap();

assert_eq!(result, 1);
{
let current_strategy = rate_limiter.strategy.lock().unwrap();
assert!(current_strategy.drop_requests_until < original_drop_until.add(timeout));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to assert that we didn't have to wait for the allowed backoff time, right?
I think the current code might be written such that you can even assert that we didn't have to wait at all.
For that you can replace .await with now_or_never().unwrap(). now_or_never() polls the future exactly once and only returns a result if it immediately resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced

}

let result = rate_limiter.execute(async { 1 }, |_| false).await.unwrap();
assert_eq!(result, 1);
}

#[tokio::test]
async fn test_execute_with_back_off() {
let timeout = Duration::from_millis(50);
let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap();
let original_drop_until = strategy.drop_requests_until;
let rate_limiter = RateLimiter::from_strategy(strategy, "test_back_off".to_string());

// start the back off
let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| true)
.await
.unwrap();

assert_eq!(result, 1);
let drop_until = {
let current_strategy = rate_limiter.strategy.lock().unwrap();
let drop_until = current_strategy.drop_requests_until;
assert!(drop_until >= original_drop_until.add(timeout));
drop_until
};

// back off is not over, expecting a RateLimiterError
let result = rate_limiter.execute(async { 1 }, |_| false).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would be nice to have some comments what each individual block is testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

assert_eq!(result, Err(RateLimiterError::RateLimited));
{
let current_strategy = rate_limiter.strategy.lock().unwrap();
assert_eq!(current_strategy.drop_requests_until, drop_until);
}

// back off is over
let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| false)
.await
.unwrap();
assert_eq!(result, 1);
}
}
1 change: 1 addition & 0 deletions crates/solvers/src/boundary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ pub mod baseline;
pub mod legacy;
pub mod liquidity;
pub mod naive;
pub mod rate_limiter;

pub type Result<T> = anyhow::Result<T>;
58 changes: 58 additions & 0 deletions crates/solvers/src/boundary/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use {
anyhow::Result,
shared::rate_limiter::{
RateLimiter as SharedRateLimiter,
RateLimiterError as SharedRateLimiterError,
RateLimitingStrategy as SharedRateLimitingStrategy,
},
std::{future::Future, time::Duration},
thiserror::Error,
};

pub struct RateLimiter {
inner: SharedRateLimiter,
Copy link
Contributor

@sunce86 sunce86 Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would prefer to have here
inner: shared::RateLimiter
to be consistent with how we refer to shared components from the boundary.

Copy link
Contributor Author

@squadgazzz squadgazzz Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That actually shared::rate_limiter::RateLimiter which is quite long and will be used across the code. That's why I decided to stick with the shorter versions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to export RateLimiter to shared with pub use and therefore skip ::rate_limiter

}

#[derive(Debug, Clone)]
pub struct RateLimitingStrategy {
inner: SharedRateLimitingStrategy,
}

impl RateLimitingStrategy {
pub fn try_new(
back_off_growth_factor: f64,
min_back_off: Duration,
max_back_off: Duration,
) -> Result<Self> {
SharedRateLimitingStrategy::try_new(back_off_growth_factor, min_back_off, max_back_off)
.map(|shared| Self { inner: shared })
}
}

#[derive(Error, Debug, Clone, Default)]
pub enum RateLimiterError {
#[default]
#[error("rate limited")]
RateLimited,
}

impl RateLimiter {
pub fn new(strategy: RateLimitingStrategy, name: String) -> Self {
Self {
inner: SharedRateLimiter::from_strategy(strategy.inner, name),
}
}

pub async fn execute_with_back_off<T>(
&self,
task: impl Future<Output = T>,
requires_back_off: impl Fn(&T) -> bool,
) -> Result<T, RateLimiterError> {
self.inner
.execute_with_back_off(task, requires_back_off)
.await
.map_err(|err| match err {
SharedRateLimiterError::RateLimited => RateLimiterError::RateLimited,
})
}
}
100 changes: 68 additions & 32 deletions crates/solvers/src/domain/solver/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@

use {
crate::{
domain,
domain::{auction, dex::slippage, order, solution, solver::dex::fills::Fills},
boundary::rate_limiter::{RateLimiter, RateLimiterError},
domain::{
self,
auction,
dex::{self, slippage},
order::{self, Order},
solution,
solver::dex::fills::Fills,
},
infra,
},
futures::{future, stream, FutureExt, StreamExt},
Expand Down Expand Up @@ -33,10 +40,14 @@ pub struct Dex {

/// Parameters used to calculate the revert risk of a solution.
risk: domain::Risk,

/// Handles 429 Too Many Requests error with a retry mechanism
rate_limiter: RateLimiter,
}

impl Dex {
pub fn new(dex: infra::dex::Dex, config: infra::config::dex::Config) -> Self {
let rate_limiter = RateLimiter::new(config.rate_limiting_strategy, "dex_api".to_string());
Self {
dex,
simulator: infra::dex::Simulator::new(
Expand All @@ -48,6 +59,7 @@ impl Dex {
concurrent_requests: config.concurrent_requests,
fills: Fills::new(config.smallest_partial_fill),
risk: config.risk,
rate_limiter,
}
}

Expand Down Expand Up @@ -86,42 +98,66 @@ impl Dex {
.filter_map(future::ready)
}

async fn solve_order(
async fn try_solve(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't retry until we get a non-rate-limited result we could still skip orders.
IMO this is fine but I think this is not what you wanted to avoid in the initial implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will create a separate issue to consider using a retry mechanism to not overload this PR

&self,
order: order::UserOrder<'_>,
order: &Order,
dex_order: &dex::Order,
tokens: &auction::Tokens,
gas_price: auction::GasPrice,
) -> Option<solution::Solution> {
let order = order.get();
let swap = {
let order = self.fills.dex_order(order, tokens)?;
let slippage = self.slippage.relative(&order.amount(), tokens);
self.dex.swap(&order, &slippage, tokens, gas_price).await
};

let swap = match swap {
Ok(swap) => swap,
Err(err @ infra::dex::Error::NotFound) => {
if order.partially_fillable {
// Only adjust the amount to try next if we are sure the API worked correctly
// yet still wasn't able to provide a swap.
self.fills.reduce_next_try(order.uid);
} else {
tracing::debug!(?err, "skipping order");
) -> Option<dex::Swap> {
let dex_err_handler = |err: infra::dex::Error| {
match &err {
err @ infra::dex::Error::NotFound => {
if order.partially_fillable {
// Only adjust the amount to try next if we are sure the API
// worked
// correctly yet still wasn't able to provide a
// swap.
self.fills.reduce_next_try(order.uid);
} else {
tracing::debug!(?err, "skipping order");
}
}
err @ infra::dex::Error::OrderNotSupported => {
tracing::debug!(?err, "skipping order")
}
err @ infra::dex::Error::RateLimited => {
tracing::debug!(?err, "encountered rate limit")
}
infra::dex::Error::Other(err) => {
tracing::warn!(?err, "failed to get swap")
}
return None;
}
Err(err @ infra::dex::Error::OrderNotSupported) => {
tracing::debug!(?err, "skipping order");
return None;
}
Err(infra::dex::Error::Other(err)) => {
tracing::warn!(?err, "failed to get swap");
return None;
}
err
};
let swap = async {
let slippage = self.slippage.relative(&dex_order.amount(), tokens);
self.dex
.swap(dex_order, &slippage, tokens, gas_price)
.await
.map_err(dex_err_handler)
};
self.rate_limiter
.execute_with_back_off(swap, |result| {
matches!(result, Err(infra::dex::Error::RateLimited))
})
.await
.map_err(|err| match err {
RateLimiterError::RateLimited => infra::dex::Error::RateLimited,
})
.and_then(|result| result)
.ok()
}

let uid = order.uid;
async fn solve_order(
&self,
order: order::UserOrder<'_>,
tokens: &auction::Tokens,
gas_price: auction::GasPrice,
) -> Option<solution::Solution> {
let order = order.get();
let dex_order = self.fills.dex_order(order, tokens)?;
let swap = self.try_solve(order, &dex_order, tokens, gas_price).await?;
let sell = tokens.reference_price(&order.sell.token);
let Some(solution) = swap
.into_solution(order.clone(), gas_price, sell, &self.risk, &self.simulator)
Expand All @@ -133,7 +169,7 @@ impl Dex {

tracing::debug!("solved");
// Maybe some liquidity appeared that enables a bigger fill.
self.fills.increase_next_try(uid);
self.fills.increase_next_try(order.uid);

Some(solution.with_buffers_internalizations(tokens))
}
Expand Down
33 changes: 32 additions & 1 deletion crates/solvers/src/infra/config/dex/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

use {
crate::{
boundary::rate_limiter::RateLimitingStrategy,
domain::{dex::slippage, eth, Risk},
infra::{blockchain, config::unwrap_or_log, contracts},
util::serialize,
},
bigdecimal::BigDecimal,
serde::{de::DeserializeOwned, Deserialize},
serde_with::serde_as,
std::{fmt::Debug, num::NonZeroUsize, path::Path},
std::{fmt::Debug, num::NonZeroUsize, path::Path, time::Duration},
tokio::fs,
};

Expand Down Expand Up @@ -48,6 +49,18 @@ struct Config {
/// (gas_amount_factor, gas_price_factor, nmb_orders_factor, intercept)
risk_parameters: (f64, f64, f64, f64),

/// Back-off growth factor for rate limiting.
#[serde(default = "default_back_off_growth_factor")]
back_off_growth_factor: f64,

/// Minimum back-off time in seconds for rate limiting.
#[serde(default = "default_min_back_off")]
min_back_off: u64,

/// Maximum back-off time in seconds for rate limiting.
#[serde(default = "default_max_back_off")]
max_back_off: u64,

/// Settings specific to the wrapped dex API.
dex: toml::Value,
}
Expand All @@ -64,6 +77,18 @@ fn default_smallest_partial_fill() -> eth::U256 {
eth::U256::exp10(16) // 0.01 ETH
}

fn default_back_off_growth_factor() -> f64 {
1.0
}

fn default_min_back_off() -> u64 {
Default::default()
}

fn default_max_back_off() -> u64 {
Default::default()
}

/// Loads the base solver configuration from a TOML file.
///
/// # Panics
Expand Down Expand Up @@ -117,6 +142,12 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
nmb_orders_factor: config.risk_parameters.2,
intercept: config.risk_parameters.3,
},
rate_limiting_strategy: RateLimitingStrategy::try_new(
config.back_off_growth_factor,
Duration::from_secs(config.min_back_off),
Duration::from_secs(config.max_back_off),
)
.unwrap(),
};
(config, dex)
}
Loading
Loading