-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DEX solvers rate limiter #2071
DEX solvers rate limiter #2071
Conversation
698b5e2
to
432d0e9
Compare
}) | ||
} | ||
|
||
pub async fn execute_with_retries<T, F, Fut>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should probably not manage retries and backoff times in 2 places (this rate limiter and the shared rate limiter). Simply exposing execute()
should be sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should even bake in retry logic into the rate limiter configuration (I don't think we need it in this use case, if a single order dex solver got rate limited it's fine for it to skip an order). Otherwise we may keep backing off more in unexpected ways (e.g. making us get stuck on a single order past our deadline).
From an API perspective, I'd prefer to specify the retry behaviour as part of execute
(as it feels to me the call-site should be explicitly aware of that behavior), but again I don't think we need it here so it may be simpler to just leave not implement automatic retries in the rate limiter.
If we decide to implement retry, I agree we should make it part of the shared rate limiter so that the behavior is consistent for all usages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if a single order dex solver got rate limited it's fine for it to skip an order
Considering this, I see the following solution: if an order is skipped due to rate limiting, we wait until the cool-down ends and then continue with the following orders. That means the retry logic won't even be required. The only thing that needs to be added is a blocking mechanism for the thread to cool down.
This could be implemented in the shared rate limiter with a separate method like execute_with_cooldown
or the service business logic itself.
Suggestion by Martin:
let response = execute().await;
if response.rate_limited() {
wait_until_ready()
}
Let me know if that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't make the rate limiter block from giving us a response until the cooldown period is reached (this seems like the worst of both worlds, the initial request is slow only to tell us that we are rate limited).
If you feel strongly about delaying the next request (instead of the rate limiter just rejecting all subsequent requests as it's done for current rate limits), I think a retry is better. I'd just
- implement it in the shared component
- have the caller specify the number of retries (rather than it being a top level struct parameter)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can explain why I decided to go with the retry: Don't really see any sense in returning RateLimited
error for all the subsequent orders, meaning the whole auction will be skipped since the iteration over all the orders is indeed much faster than the cooldown period.
Instead, we can give it a chance to cool down and try again within the original deadline.
Correct me if my assumption is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the initial request is slow only to tell us that we are rate limited
My previous suggestion was wrong actually:
if an order is skipped due to rate limiting, we wait until the cool-down ends and then continue with the following orders.
Instead, I wanted to do the opposite. If the first request is rate-limited, then the following one waits once the cool-down is over.
{ | ||
let mut retries = 0; | ||
while retries < self.max_retries { | ||
let result = self.execute(task(), requires_back_off.clone()).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this clone could be avoided if we make the old rate limiter take a reference to requires_back_off
instead of a value.
Might be worthwhile to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that the caller needs to ensure that the closure passed to the function lives long enough for the duration of the function call. In asynchronous contexts, this can be tricky because the future returned by the function might outlive the closure, especially if it's a temporary closure. That concern actually made me think about cloning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that the caller needs to ensure that the closure passed to the function lives long enough for the duration of the function call.
Yes but this is usually easy to do by cloning the closure and then moving it into the future:
let cloned_closure = *closure_ref.clone();
return async move || {
use_by_ref(&cloned_closure())
}
I think this is a bit nicer because cloning in this case is opt-in instead of always required. BTW before making this suggestion I checked that our current code wouldn't not run into borrowing issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very good! I'd remove the added logic (as I don't think we need it right now and it adds a fair share of complexity) and make the rate limiter local to the dex config.
}) | ||
} | ||
|
||
pub async fn execute_with_retries<T, F, Fut>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should even bake in retry logic into the rate limiter configuration (I don't think we need it in this use case, if a single order dex solver got rate limited it's fine for it to skip an order). Otherwise we may keep backing off more in unexpected ways (e.g. making us get stuck on a single order past our deadline).
From an API perspective, I'd prefer to specify the retry behaviour as part of execute
(as it feels to me the call-site should be explicitly aware of that behavior), but again I don't think we need it here so it may be simpler to just leave not implement automatic retries in the rate limiter.
If we decide to implement retry, I agree we should make it part of the shared rate limiter so that the behavior is consistent for all usages.
} | ||
|
||
fn default_max_back_off() -> Duration { | ||
Duration::default() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBD: Turning it off by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the same default values that we already have for the rate limiter in the orderbook/autopilot crate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default for the shared rate limiter is
Self::try_new(1.0, Duration::default(), Duration::default()).unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I'd make sure we are as consistent with existing use as possible with the configuration
@@ -174,7 +174,7 @@ impl RateLimiter { | |||
} | |||
} | |||
|
|||
#[derive(Error, Debug, Clone, Default)] | |||
#[derive(Error, Debug, Clone, Default, PartialEq)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this required somewhere or just for completeness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required anymore for the latest code revision.
back_off_growth_factor: f64, | ||
|
||
/// Minimum back-off time in seconds for rate limiting. | ||
#[serde(with = "humantime_serde", default = "default_min_back_off")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure we already configure values like this elsewhere in the code and would like to use a consistent parsing. If humantime makes things much easier, we can replace all occurrences in a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I found, other configs specify units explicitly by providing the function value_parser = duration_from_seconds
to parse Duration
values. In order to switch between units, we need to make changes in the code, which is not flexible. With the humantime
, we can use the most popular approach with defining units in the config directly, like 1s
, 1000ms
, etc. If I didn't miss anything, we could integrate the humantime
everywhere in a separate PR. Removed it from this PR to be more consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
fn default_max_back_off() -> Duration { | ||
Duration::default() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the same default values that we already have for the rate limiter in the orderbook/autopilot crate?
) | ||
.await | ||
.map_err(|err| match err { | ||
RateLimiterError::RateLimited => infra::dex::Error::RateLimited, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlike in try_solve we are not logging that we are rate_limited once we are rate limited
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error will be logged either here:
https://github.com/cowprotocol/services/blob/33d1a1eb879a0031e8a10a799d21cab4809e336d/crates/shared/src/rate_limiter.rs#L200C24-L200C24
or here: https://github.com/cowprotocol/services/blob/33d1a1eb879a0031e8a10a799d21cab4809e336d/crates/shared/src/rate_limiter.rs#L200C24-L200C24
I think there is no need to create one more log.
tracing::debug!(?err, "skipping order") | ||
} | ||
err @ infra::dex::Error::RateLimited => { | ||
tracing::debug!(?err, "encountered rate limit") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this codepath even possible? I believe this error would be caught by the rate limiter component and returned in solve_order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error is handled even before the rate limiter and used later in the requires_back_off
closure by the limiter. The rate limiter either fails fast if the back-off timeout is still not over or executes the task and returns the RateLimited error.
let swap = self | ||
.rate_limiter | ||
.execute_with_back_off( | ||
self.try_solve(order, &dex_order, tokens, gas_price), | ||
|result| matches!(result, Err(infra::dex::Error::RateLimited)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move this part into try_solver so that we have one method which returns the high level Result that we care about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay to me.
Just curious if we are now fine with the possibility of skipping orders.
assert_eq!(result, 1); | ||
{ | ||
let current_strategy = rate_limiter.strategy.lock().unwrap(); | ||
assert!(current_strategy.drop_requests_until < original_drop_until.add(timeout)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to assert that we didn't have to wait for the allowed backoff time, right?
I think the current code might be written such that you can even assert that we didn't have to wait at all.
For that you can replace .await
with now_or_never().unwrap()
. now_or_never()
polls the future exactly once and only returns a result if it immediately resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced
drop_until | ||
}; | ||
|
||
let result = rate_limiter.execute(async { 1 }, |_| false).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would be nice to have some comments what each individual block is testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
crates/shared/src/rate_limiter.rs
Outdated
|
||
#[tokio::test] | ||
async fn test_execute_with_back_off() { | ||
let timeout = Duration::from_secs(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To test timing sensitive stuff it's usually sufficient to use timeouts in the tens of milliseconds to not delay the test suite unnecessarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduced the timeout
) -> Option<dex::Swap> { | ||
self.rate_limiter | ||
.execute_with_back_off( | ||
async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: To avoid unnecessary indentation you can first create the future and then pass it to execute_with_back_off()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted
@@ -86,42 +101,65 @@ impl Dex { | |||
.filter_map(future::ready) | |||
} | |||
|
|||
async fn try_solve( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we don't retry until we get a non-rate-limited result we could still skip orders.
IMO this is fine but I think this is not what you wanted to avoid in the initial implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will create a separate issue to consider using a retry mechanism to not overload this PR
.execute_with_back_off( | ||
async { | ||
let slippage = self.slippage.relative(&dex_order.amount(), tokens); | ||
self.dex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would probably also store the result in a variable to avoid having the match
so far to the right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That actually was the initial approach that produced too much redundant code with returning the same err
in each case and doing nothing with the result. Extracted the error handler. Let me know if that's still not readable enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just a few nits
}; | ||
|
||
pub struct RateLimiter { | ||
inner: SharedRateLimiter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would prefer to have here
inner: shared::RateLimiter
to be consistent with how we refer to shared components from the boundary
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That actually shared::rate_limiter::RateLimiter
which is quite long and will be used across the code. That's why I decided to stick with the shorter versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to export RateLimiter
to shared
with pub use
and therefore skip ::rate_limiter
crates/solvers/src/run.rs
Outdated
@@ -47,28 +48,32 @@ async fn run_with(args: cli::Args, bind: Option<oneshot::Sender<SocketAddr>>) { | |||
dex::Dex::ZeroEx( | |||
dex::zeroex::ZeroEx::new(config.zeroex).expect("invalid 0x configuration"), | |||
), | |||
config.base, | |||
config.base.clone(), | |||
RateLimiter::new(config.base.rate_limiting_strategy, "dex_api".to_string()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why calling RateLimiter::new
here, if we can do it inside solver::Dex::new
? (it depends on the config only which is available within)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, rearranged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-approving since this PR changed a bit since last time.
46858ac
to
ccf7e41
Compare
ccf7e41
to
a9c5365
Compare
Description
Properly handles
429 Too Many Requests
response from DEX solvers by utilizingshared::RateLimiter
to back off further requests with a cool-down period.Changes
RateLimited
variant on solvers::infra::dex::Errorshared::rate_limiter::RateLimiter
thoughsolvers::boundary
max_retries
for thesolvers
crateHow to test
TBD: is it possible to simulate it in staging?
Related Issues
Fixes #2068