Skip to content

Commit

Permalink
Concurrent solvers (#1851)
Browse files Browse the repository at this point in the history
This PR add concurrency to the single order solvers in the `solvers`
binary. Fairly simple, nothing complicated.

### Test Plan

Tests continue to pass with new concurrency parameter.
  • Loading branch information
Nicholas Rodrigues Lordello authored Sep 8, 2023
1 parent 3a9eef0 commit c901e38
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
2 changes: 1 addition & 1 deletion crates/solvers/src/domain/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub enum Class {
/// the other hand, liquidity orders are CoW Protocol orders, meaning that they
/// first provide the tokens being swapped to and only get paid at the end of
/// the settlement.
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
pub struct UserOrder<'a>(&'a Order);

impl<'a> UserOrder<'a> {
Expand Down
40 changes: 26 additions & 14 deletions crates/solvers/src/domain/solver/dex/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
//! A simple solver that matches orders directly with swaps from the external
//! DEX and DEX aggregator APIs.
mod fills;

use {
crate::{
domain::{auction, dex::slippage, order, solution, solver::dex::fills::Fills},
infra,
},
futures::{future, stream, StreamExt},
std::num::NonZeroUsize,
tracing::Instrument,
};

mod fills;

pub struct Dex {
/// The DEX API client.
dex: infra::dex::Dex,

/// The slippage configuration to use for the solver.
slippage: slippage::Limits,

/// The number of concurrent requests to make.
concurrent_requests: NonZeroUsize,

/// Helps to manage the strategy to fill orders (especially partially
/// fillable orders).
fills: Fills,
Expand All @@ -28,27 +33,20 @@ impl Dex {
Self {
dex,
slippage: config.slippage,
concurrent_requests: config.concurrent_requests,
fills: Fills::new(config.smallest_partial_fill),
}
}

pub async fn solve(&self, auction: auction::Auction) -> Vec<solution::Solution> {
// TODO:
// * concurrency

let mut solutions = Vec::new();
let solve_orders = async {
for order in auction.orders.iter().filter_map(order::UserOrder::new) {
let span = tracing::info_span!("solve", order = %order.get().uid);
if let Some(solution) = self
.solve_order(order, &auction.tokens, auction.gas_price)
.instrument(span)
.await
{
solutions.push(solution);
}
let mut stream = self.solution_stream(&auction);
while let Some(solution) = stream.next().await {
solutions.push(solution);
}
};

let deadline = auction.deadline.remaining().unwrap_or_default();
if tokio::time::timeout(deadline, solve_orders).await.is_err() {
tracing::debug!("reached deadline; stopping to solve");
Expand All @@ -59,6 +57,20 @@ impl Dex {
solutions
}

fn solution_stream<'a>(
&'a self,
auction: &'a auction::Auction,
) -> impl stream::Stream<Item = solution::Solution> + 'a {
stream::iter(auction.orders.iter().filter_map(order::UserOrder::new))
.map(|order| {
let span = tracing::info_span!("solve", order = %order.get().uid);
self.solve_order(order, &auction.tokens, auction.gas_price)
.instrument(span)
})
.buffer_unordered(self.concurrent_requests.get())
.filter_map(future::ready)
}

async fn solve_order(
&self,
order: order::UserOrder<'_>,
Expand Down
11 changes: 10 additions & 1 deletion crates/solvers/src/infra/config/dex/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
bigdecimal::BigDecimal,
serde::{de::DeserializeOwned, Deserialize},
serde_with::serde_as,
std::{fmt::Debug, path::Path},
std::{fmt::Debug, num::NonZeroUsize, path::Path},
tokio::fs,
};

Expand All @@ -25,6 +25,10 @@ struct Config {
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
absolute_slippage: Option<eth::U256>,

/// The number of concurrent requests to make to the DEX aggregator API.
#[serde(default = "default_concurrent_requests")]
concurrent_requests: NonZeroUsize,

/// The amount of Ether a partially fillable order should be filled for at
/// least.
#[serde(default = "default_smallest_partial_fill")]
Expand All @@ -38,6 +42,10 @@ fn default_relative_slippage() -> BigDecimal {
BigDecimal::new(1.into(), 2) // 1%
}

fn default_concurrent_requests() -> NonZeroUsize {
NonZeroUsize::new(1).unwrap()
}

fn default_smallest_partial_fill() -> eth::U256 {
eth::U256::exp10(16) // 0.01 ETH
}
Expand All @@ -63,6 +71,7 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
config.absolute_slippage.map(eth::Ether),
)
.expect("invalid slippage limits"),
concurrent_requests: config.concurrent_requests,
smallest_partial_fill: eth::Ether(config.smallest_partial_fill),
};

Expand Down
6 changes: 5 additions & 1 deletion crates/solvers/src/infra/config/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ pub mod oneinch;
pub mod paraswap;
pub mod zeroex;

use crate::domain::{dex::slippage, eth};
use {
crate::domain::{dex::slippage, eth},
std::num::NonZeroUsize,
};

pub struct Config {
pub slippage: slippage::Limits,
pub concurrent_requests: NonZeroUsize,
pub smallest_partial_fill: eth::Ether,
}

0 comments on commit c901e38

Please sign in to comment.