Skip to content

Commit

Permalink
Handle Paraswap rate limiting (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinquaXD authored Apr 29, 2022
1 parent eaf91b5 commit a9eb00a
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 15 deletions.
1 change: 1 addition & 0 deletions crates/orderbook/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ async fn main() {
Arc::new(DefaultParaswapApi {
client: client.clone(),
partner: args.shared.paraswap_partner.clone().unwrap_or_default(),
rate_limiter: args.shared.paraswap_rate_limiter.clone().map(Into::into),
}),
token_info_fetcher.clone(),
args.shared.disabled_paraswap_dexs.clone(),
Expand Down
39 changes: 38 additions & 1 deletion crates/shared/src/arguments.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Contains command line arguments and related helpers that are shared between the binaries.
use crate::{
gas_price_estimation::GasEstimatorType,
http_client::RateLimitingStrategy,
sources::{balancer_v2::BalancerFactoryKind, BaselineSource},
};
use anyhow::{ensure, Result};
use anyhow::{ensure, Context, Result};
use ethcontract::{H160, U256};
use std::{
num::{NonZeroU64, ParseFloatError},
Expand Down Expand Up @@ -101,6 +102,15 @@ pub struct Arguments {
#[clap(long, env, default_value = "ParaSwapPool4", use_value_delimiter = true)]
pub disabled_paraswap_dexs: Vec<String>,

/// Configures the back off strategy for the paraswap API when our requests get rate limited.
/// Requests issued while back off is active get dropped entirely.
/// Needs to be passed as "<back_off_growth_factor>,<min_back_off>,<max_back_off>".
/// back_off_growth_factor: f64 > 1.0
/// min_back_off: f64 in seconds
/// max_back_off: f64 in seconds
#[clap(long, env, verbatim_doc_comment)]
pub paraswap_rate_limiter: Option<RateLimitingStrategy>,

#[clap(long, env)]
pub zeroex_url: Option<String>,

Expand Down Expand Up @@ -165,3 +175,30 @@ pub fn wei_from_gwei(s: &str) -> anyhow::Result<f64> {
let in_gwei: f64 = s.parse()?;
Ok(in_gwei * 1e9)
}

impl FromStr for RateLimitingStrategy {
type Err = anyhow::Error;

fn from_str(config: &str) -> Result<Self> {
let mut parts = config.split(',');
let back_off_growth_factor = parts
.next()
.ok_or_else(|| anyhow::anyhow!("missing back_off_growth_factor"))?;
let min_back_off = parts
.next()
.ok_or_else(|| anyhow::anyhow!("missing min_back_off"))?;
let max_back_off = parts
.next()
.ok_or_else(|| anyhow::anyhow!("missing max_back_off"))?;
ensure!(
parts.next().is_none(),
"extraneous rate limiting parameters"
);
let back_off_growth_factor: f64 = back_off_growth_factor
.parse()
.context("parsing back_off_growth_factor")?;
let min_back_off = duration_from_seconds(min_back_off).context("parsing min_back_off")?;
let max_back_off = duration_from_seconds(max_back_off).context("parsing max_back_off")?;
Self::try_new(back_off_growth_factor, min_back_off, max_back_off)
}
}
204 changes: 200 additions & 4 deletions crates/shared/src/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use anyhow::{anyhow, Result};
use reqwest::Response;
use anyhow::{anyhow, ensure, Result};
use reqwest::{RequestBuilder, Response};
use std::{
sync::{Mutex, MutexGuard},
time::{Duration, Instant},
};

/// Extracts the bytes of the response up to some size limit.
///
Expand All @@ -19,11 +23,144 @@ pub async fn response_body_with_size_limit(
Ok(bytes)
}

#[derive(Debug, Clone)]
pub struct RateLimitingStrategy {
drop_requests_until: Instant,
/// How many requests got rate limited in a row.
times_rate_limited: u64,
back_off_growth_factor: f64,
min_back_off: Duration,
max_back_off: Duration,
}

impl RateLimitingStrategy {
pub fn try_new(
back_off_growth_factor: f64,
min_back_off: Duration,
max_back_off: Duration,
) -> Result<Self> {
ensure!(
back_off_growth_factor.is_normal(),
"back_off_growth_factor must be a normal f64"
);
ensure!(
back_off_growth_factor > 1.0,
"back_off_growth_factor needs to be greater than 1.0"
);
ensure!(
min_back_off <= max_back_off,
"min_back_off needs to be <= max_back_off"
);
Ok(Self {
drop_requests_until: Instant::now(),
times_rate_limited: 0,
back_off_growth_factor,
min_back_off,
max_back_off,
})
}
}

impl RateLimitingStrategy {
/// Resets back off and stops rate limiting requests.
pub fn response_ok(&mut self) {
self.times_rate_limited = 0;
self.drop_requests_until = Instant::now();
}

/// Calculates back off based on how often we got rate limited in a row.
fn get_current_back_off(&self) -> Duration {
let factor = self
.back_off_growth_factor
.powf(self.times_rate_limited as f64);
let back_off_secs = self.min_back_off.as_secs_f64() * factor;
if !back_off_secs.is_normal() || back_off_secs < 0. || back_off_secs > u64::MAX as f64 {
// This would cause a panic in `Duration::from_secs_f64()`
// TODO refactor this when `Duration::try_from_secs_f64()` gets stabilized:
// https://doc.rust-lang.org/stable/std/time/struct.Duration.html#method.try_from_secs_f64
return self.max_back_off;
}
let current_back_off = Duration::from_secs_f64(back_off_secs);
std::cmp::min(self.max_back_off, current_back_off)
}

/// Returns updated back off if no other thread increased it in the mean time.
pub fn response_rate_limited(&mut self, previous_rate_limits: u64) -> Option<Duration> {
if self.times_rate_limited != previous_rate_limits {
// Don't increase back off if somebody else already updated it in the meantime.
return None;
}

self.times_rate_limited += 1;
let new_back_off = self.get_current_back_off();
self.drop_requests_until = Instant::now() + new_back_off;
Some(new_back_off)
}

/// Returns number of times we got rate limited in a row if we are currently allowing requests.
pub fn times_rate_limited(&self, now: Instant) -> Option<u64> {
if self.drop_requests_until > now {
return None;
}

Some(self.times_rate_limited)
}
}

#[derive(Debug)]
pub struct RateLimiter {
pub strategy: Mutex<RateLimitingStrategy>,
}

impl RateLimiter {
fn strategy(&self) -> MutexGuard<RateLimitingStrategy> {
self.strategy.lock().unwrap()
}
}

impl From<RateLimitingStrategy> for RateLimiter {
fn from(strategy: RateLimitingStrategy) -> Self {
Self {
strategy: Mutex::new(strategy),
}
}
}

impl RateLimiter {
/// If a request receives the response "Too many requests" (status code 429) future requests
/// will get dropped for some time. Every successive 429 response increases that time exponentially.
/// When a request eventually returns a normal result again future requests will no longer get
/// dropped until the next 429 response occurs.
pub async fn request(&self, request: RequestBuilder) -> Result<Response> {
let times_rate_limited = match self.strategy().times_rate_limited(Instant::now()) {
None => {
tracing::warn!("dropping request because API is currently rate limited");
anyhow::bail!("backing off rate limit");
}
Some(times_rate_limited) => times_rate_limited,
};

let response = request.send().await?;

if response.status() == 429 {
if let Some(new_back_off) = self.strategy().response_rate_limited(times_rate_limited) {
tracing::warn!("extended rate limiting for {:?}", new_back_off);
}
anyhow::bail!("rate limited");
} else {
self.strategy().response_ok();
tracing::debug!("reset rate limit");
Ok(response)
}
}
}

#[cfg(test)]
mod tests {
use reqwest::Client;

use super::*;
use futures::stream::{self, StreamExt};
use reqwest::Client;
use tokio::time::{sleep_until, Instant as TokioInstant};

#[tokio::test]
#[ignore]
Expand All @@ -43,4 +180,63 @@ mod tests {
let text = std::str::from_utf8(&bytes).unwrap();
dbg!(text);
}

#[tokio::test]
#[ignore]
async fn rate_limited_requests() {
let client = Client::default();

let url = "https://apiv5.paraswap.io/prices?srcToken=0x99d8a9c45b2eca8864373a26d1459e3dff1e17f3&destToken=0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48&srcDecimals=18&destDecimals=6&amount=100000000&side=BUY&network=1&excludeDEXS=ParaSwapPool4";
let strategy = RateLimitingStrategy::try_new(
2.0,
Duration::from_millis(16),
Duration::from_millis(20_000),
)
.unwrap();
let rate_limiter = RateLimiter::from(strategy);
// note that 1_000 requests will not always trigger a rate limit
let mut stream = stream::iter(0..1_000).map(|_| async {}).buffer_unordered(2);
while stream.next().await.is_some() {
let request = client.get(url);
let response = rate_limiter.request(request).await;
match &response {
Ok(response) => println!("{}", response.status()),
Err(e) => {
println!("error: {}", e);
let instant = rate_limiter.strategy.lock().unwrap().drop_requests_until;
println!(
"sleeping for {} milliseconds",
instant.duration_since(Instant::now()).as_millis()
);
sleep_until(TokioInstant::from_std(instant)).await;
}
}
}
}

#[test]
fn current_back_off_does_not_panic() {
let max = Duration::from_secs(60);
let back_off = RateLimitingStrategy {
drop_requests_until: Instant::now(),
times_rate_limited: 1,
// internal calculations don't overflow `Duration`
back_off_growth_factor: f64::MAX,
min_back_off: Duration::from_millis(16),
max_back_off: max,
}
.get_current_back_off();
assert_eq!(max, back_off);

let max = Duration::from_secs(60);
let back_off = RateLimitingStrategy {
drop_requests_until: Instant::now(),
times_rate_limited: 3,
back_off_growth_factor: 2.,
min_back_off: Duration::from_millis(16),
max_back_off: max,
}
.get_current_back_off();
assert_eq!(Duration::from_millis(16 * 8), back_off);
}
}
26 changes: 16 additions & 10 deletions crates/shared/src/paraswap_api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::debug_bytes;
use crate::{debug_bytes, http_client::RateLimiter};
use anyhow::Result;
use derivative::Derivative;
use ethcontract::{H160, U256};
Expand Down Expand Up @@ -28,14 +28,19 @@ pub trait ParaswapApi: Send + Sync {
pub struct DefaultParaswapApi {
pub client: Client,
pub partner: String,
pub rate_limiter: Option<RateLimiter>,
}

#[async_trait::async_trait]
impl ParaswapApi for DefaultParaswapApi {
async fn price(&self, query: PriceQuery) -> Result<PriceResponse, ParaswapResponseError> {
let url = query.into_url(&self.partner);
tracing::debug!("Querying Paraswap price API: {}", url);
let response = self.client.get(url).send().await?;
let request = self.client.get(url);
let response = match &self.rate_limiter {
Some(limiter) => limiter.request(request).await?,
None => request.send().await?,
};
let status = response.status();
let text = response.text().await?;
tracing::debug!(%status, %text, "Response from Paraswap price API");
Expand All @@ -49,12 +54,12 @@ impl ParaswapApi for DefaultParaswapApi {
query,
partner: &self.partner,
};
let response_text = query
.into_request(&self.client)
.send()
.await?
.text()
.await?;
let request = query.into_request(&self.client);
let response = match &self.rate_limiter {
Some(limiter) => limiter.request(request).await?,
None => request.send().await?,
};
let response_text = response.text().await?;
parse_paraswap_response_text(&response_text)
}
}
Expand Down Expand Up @@ -84,7 +89,7 @@ pub enum ParaswapResponseError {
Retryable(String),

#[error("other ParaSwap error: {0}")]
Other(String),
Other(#[from] anyhow::Error),
}

impl ParaswapResponseError {
Expand Down Expand Up @@ -119,7 +124,7 @@ where
| "Too much slippage on quote, please try again" => {
Err(ParaswapResponseError::InsufficientLiquidity(message))
}
_ => Err(ParaswapResponseError::Other(message)),
_ => Err(ParaswapResponseError::Other(anyhow::anyhow!(message))),
},
}
}
Expand Down Expand Up @@ -727,6 +732,7 @@ mod tests {
let api = DefaultParaswapApi {
client: Client::new(),
partner: "Test".into(),
rate_limiter: None,
};

let good_query = TransactionBuilderQuery {
Expand Down
1 change: 1 addition & 0 deletions crates/shared/src/price_estimation/paraswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ mod tests {
let paraswap = DefaultParaswapApi {
client: Client::new(),
partner: "".to_string(),
rate_limiter: None,
};
let estimator = ParaswapPriceEstimator {
paraswap: Arc::new(paraswap),
Expand Down
1 change: 1 addition & 0 deletions crates/solver/src/solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ pub fn create(
disabled_paraswap_dexs.clone(),
client.clone(),
paraswap_partner.clone(),
None,
),
solver_metrics.clone(),
))),
Expand Down
Loading

0 comments on commit a9eb00a

Please sign in to comment.