Skip to content

Commit

Permalink
Fix metrics and observations in autopilot runloop (#3039)
Browse files Browse the repository at this point in the history
# Description
This PR addresses several smaller issues related to metrics in the
autopilot run loop and refactors functions in preparation for
#2996 (comment).

# Changes
<!-- List of detailed changes (how the change is accomplished) -->

- [ ] `Self::is_solution_fair` is now checked for every solution, not
just for the global winner (fixing an oversight in [PR
#2996](#2996))
- [ ] Updated the "matched but unsettled" metric to include all winners,
not just the global winner (fixing another oversight from [PR
#2996](#2996)).
- [ ] Reorganized logs and order status updates to facilitate extracting
common functionality between shadow and main competitions (to be
addressed in follow-up work).

Next step: refactor `competition` function from shadow and main runloop
to be exactly the same and unify the `Participant` struct (currently
both modes have their own version).

## How to test
Existing e2e tests
  • Loading branch information
sunce86 authored Oct 8, 2024
1 parent 416c767 commit b020020
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 87 deletions.
2 changes: 1 addition & 1 deletion crates/autopilot/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {

type SolutionId = u64;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Solution {
id: SolutionId,
solver: eth::Address,
Expand Down
8 changes: 4 additions & 4 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ pub async fn run(args: Arguments) {
hardcoded: args.trusted_tokens.unwrap_or_default(),
};
// updated in background task
let market_makable_token_list =
let trusted_tokens =
AutoUpdatingTokenList::from_configuration(market_makable_token_list_configuration).await;

let mut maintenance = Maintenance::new(
Expand Down Expand Up @@ -546,7 +546,7 @@ pub async fn run(args: Arguments) {
})
.collect(),
solvable_orders_cache,
market_makable_token_list,
trusted_tokens,
liveness.clone(),
Arc::new(maintenance),
);
Expand All @@ -566,11 +566,11 @@ async fn shadow_mode(args: Arguments) -> ! {
.drivers
.into_iter()
.map(|driver| {
infra::Driver::new(
Arc::new(infra::Driver::new(
driver.url,
driver.name,
driver.fairness_threshold.map(Into::into),
)
))
})
.collect();

Expand Down
187 changes: 107 additions & 80 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use {
rand::seq::SliceRandom,
shared::token_list::AutoUpdatingTokenList,
std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
},
Expand All @@ -61,7 +61,7 @@ pub struct RunLoop {
persistence: infra::Persistence,
drivers: Vec<Arc<infra::Driver>>,
solvable_orders_cache: Arc<SolvableOrdersCache>,
market_makable_token_list: AutoUpdatingTokenList,
trusted_tokens: AutoUpdatingTokenList,
in_flight_orders: Arc<Mutex<HashSet<OrderUid>>>,
liveness: Arc<Liveness>,
/// Maintenance tasks that should run before every runloop to have
Expand All @@ -80,7 +80,7 @@ impl RunLoop {
persistence: infra::Persistence,
drivers: Vec<Arc<infra::Driver>>,
solvable_orders_cache: Arc<SolvableOrdersCache>,
market_makable_token_list: AutoUpdatingTokenList,
trusted_tokens: AutoUpdatingTokenList,
liveness: Arc<Liveness>,
maintenance: Arc<Maintenance>,
) -> Self {
Expand All @@ -97,7 +97,7 @@ impl RunLoop {
persistence,
drivers,
solvable_orders_cache,
market_makable_token_list,
trusted_tokens,
in_flight_orders: Default::default(),
liveness,
maintenance,
Expand Down Expand Up @@ -245,13 +245,29 @@ impl RunLoop {

let auction = self.remove_in_flight_orders(auction).await;

// Mark all auction orders as `Ready` for competition
self.persistence.store_order_events(
auction.orders.iter().map(|o| OrderUid(o.uid.0)),
OrderEventLabel::Ready,
);

// Collect valid solutions from all drivers
let solutions = self.competition(&auction).await;
observe::solutions(&solutions);

// Pick winners for execution
let winners = self.select_winners(&solutions);
if winners.is_empty() {
tracing::info!("no winners for auction");
return;
}

// Mark all non-winning orders as `Considered` for execution
self.persistence.store_order_events(
non_winning_orders(&solutions, &winners),
OrderEventLabel::Considered,
);

let competition_simulation_block = self.eth.current_block().borrow().number;
let block_deadline = competition_simulation_block + self.config.submission_deadline;

Expand All @@ -273,9 +289,14 @@ impl RunLoop {
return;
}

observe::unsettled(&solutions, &winners, &auction);
for Participant { driver, solution } in winners {
tracing::info!(driver = %driver.name, solution = %solution.id(), "winner");

// Mark all winning orders as `Executing`
self.persistence
.store_order_events(solution.order_ids().copied(), OrderEventLabel::Executing);

self.start_settlement_execution(
auction.id,
single_run_start,
Expand Down Expand Up @@ -383,7 +404,7 @@ impl RunLoop {
auction: &domain::Auction,
competition_simulation_block: u64,
winning_solution: &competition::Solution,
solutions: &VecDeque<Participant>,
solutions: &[Participant],
block_deadline: u64,
) -> Result<()> {
let start = Instant::now();
Expand Down Expand Up @@ -501,18 +522,14 @@ impl RunLoop {

/// Runs the solver competition, making all configured drivers participate.
/// Returns all fair solutions sorted by their score (best to worst).
async fn competition(&self, auction: &domain::Auction) -> VecDeque<Participant> {
async fn competition(&self, auction: &domain::Auction) -> Vec<Participant> {
let request = solve::Request::new(
auction,
&self.market_makable_token_list.all(),
&self.trusted_tokens.all(),
self.config.solve_deadline,
);
let request = &request;

let order_uids = auction.orders.iter().map(|o| OrderUid(o.uid.0));
self.persistence
.store_order_events(order_uids, OrderEventLabel::Ready);

let mut solutions = futures::future::join_all(
self.drivers
.iter()
Expand All @@ -529,16 +546,22 @@ impl RunLoop {
std::cmp::Reverse(participant.solution.score().get().0)
});

// Make sure the winning solution is fair.
let mut solutions = solutions.into_iter().collect::<VecDeque<_>>();
while !Self::is_solution_fair(solutions.front(), &solutions, auction) {
let unfair_solution = solutions.pop_front().expect("must exist");
tracing::warn!(
invalidated = unfair_solution.driver.name,
"fairness check invalidated of solution"
);
}
self.report_on_solutions(&solutions, auction);
// Filter out solutions that are not fair
let solutions = solutions
.iter()
.enumerate()
.filter_map(|(index, participant)| {
if Self::is_solution_fair(participant, &solutions[index..], auction) {
Some(participant.clone())
} else {
tracing::warn!(
invalidated = participant.driver.name,
"fairness check invalidated of solution"
);
None
}
})
.collect();

solutions
}
Expand All @@ -551,7 +574,7 @@ impl RunLoop {
/// until `max_winners_per_auction` are selected. The solution is a winner
/// if it swaps tokens that are not yet swapped by any other already
/// selected winner.
fn select_winners<'a>(&self, participants: &'a VecDeque<Participant>) -> Vec<&'a Participant> {
fn select_winners<'a>(&self, participants: &'a [Participant]) -> Vec<&'a Participant> {
let mut winners = Vec::new();
let mut already_swapped_tokens = HashSet::new();
for participant in participants.iter() {
Expand All @@ -572,59 +595,13 @@ impl RunLoop {
winners
}

/// Records metrics, order events and logs for the given solutions.
/// Expects the winning solution to be the first in the list.
fn report_on_solutions(&self, solutions: &VecDeque<Participant>, auction: &domain::Auction) {
let Some(winner) = solutions.front() else {
// no solutions means nothing to report
return;
};

solutions.iter().for_each(|solution| {
tracing::debug!(
driver=%solution.driver.name,
orders=?solution.solution.order_ids(),
solution=solution.solution.id(),
"proposed solution"
);
});

let proposed_orders: HashSet<_> = solutions
.iter()
.flat_map(|solution| solution.solution.order_ids().copied())
.collect();
let winning_orders: HashSet<_> = solutions
.front()
.into_iter()
.flat_map(|solution| solution.solution.order_ids().copied())
.collect();
let mut non_winning_orders: HashSet<_> = proposed_orders
.difference(&winning_orders)
.cloned()
.collect();
self.persistence.store_order_events(
non_winning_orders.iter().cloned(),
OrderEventLabel::Considered,
);
self.persistence
.store_order_events(winning_orders, OrderEventLabel::Executing);

let auction_uids = auction.orders.iter().map(|o| o.uid).collect::<HashSet<_>>();

// Report orders that were part of a non-winning solution candidate
// but only if they were part of the auction (filter out jit orders)
non_winning_orders.retain(|uid| auction_uids.contains(uid));
Metrics::matched_unsettled(&winner.driver, non_winning_orders);
}

/// Returns true if winning solution is fair or winner is None
/// Returns true if solution is fair to other solutions
fn is_solution_fair(
winner: Option<&Participant>,
remaining: &VecDeque<Participant>,
solution: &Participant,
others: &[Participant],
auction: &domain::Auction,
) -> bool {
let Some(winner) = winner else { return true };
let Some(fairness_threshold) = winner.driver.fairness_threshold else {
let Some(fairness_threshold) = solution.driver.fairness_threshold else {
return true;
};

Expand Down Expand Up @@ -652,7 +629,7 @@ impl RunLoop {

// Record best execution per order
let mut best_executions = HashMap::new();
for other in remaining {
for other in others {
for (uid, execution) in other.solution.orders() {
best_executions
.entry(uid)
Expand All @@ -665,24 +642,24 @@ impl RunLoop {
}
}

// Check if the winning solution contains an order whose execution in the
// winning solution is more than `fairness_threshold` worse than the
// Check if the solution contains an order whose execution in the
// solution is more than `fairness_threshold` worse than the
// order's best execution across all solutions
let unfair = winner
let unfair = solution
.solution
.orders()
.iter()
.any(|(uid, winning_execution)| {
.any(|(uid, current_execution)| {
let best_execution = best_executions.get(uid).expect("by construction above");
let improvement = improvement_in_buy(best_execution, winning_execution);
let improvement = improvement_in_buy(best_execution, current_execution);
if improvement.is_zero() {
return false;
};
tracing::debug!(
?uid,
?improvement,
?best_execution,
?winning_execution,
?current_execution,
"fairness check"
);
// Improvement is denominated in buy token, use buy price to normalize the
Expand Down Expand Up @@ -897,7 +874,23 @@ impl RunLoop {
}
}

struct Participant {
fn non_winning_orders(solutions: &[Participant], winners: &[&Participant]) -> HashSet<OrderUid> {
let proposed_orders: HashSet<_> = solutions
.iter()
.flat_map(|participant| participant.solution.order_ids().copied())
.collect();
let winning_orders: HashSet<_> = winners
.iter()
.flat_map(|participant| participant.solution.order_ids().copied())
.collect();
proposed_orders
.difference(&winning_orders)
.cloned()
.collect()
}

#[derive(Clone)]
pub struct Participant {
driver: Arc<infra::Driver>,
solution: competition::Solution,
}
Expand Down Expand Up @@ -1103,4 +1096,38 @@ pub mod observe {
"Orders no longer in auction"
);
}

pub fn solutions(solutions: &[super::Participant]) {
if solutions.is_empty() {
tracing::info!("no solutions for auction");
}
for participant in solutions {
tracing::debug!(
driver = %participant.driver.name,
orders = ?participant.solution.order_ids(),
solution = %participant.solution.id(),
"proposed solution"
);
}
}

/// Records metrics for the matched but unsettled orders.
pub fn unsettled(
solutions: &[super::Participant],
winners: &[&super::Participant],
auction: &domain::Auction,
) {
let Some(winner) = winners.first() else {
// no solutions means nothing to report
return;
};

let auction_uids = auction.orders.iter().map(|o| o.uid).collect::<HashSet<_>>();

let mut non_winning_orders = super::non_winning_orders(solutions, winners);
// Report orders that were part of a non-winning solution candidate
// but only if they were part of the auction (filter out jit orders)
non_winning_orders.retain(|uid| auction_uids.contains(uid));
super::Metrics::matched_unsettled(&winner.driver, non_winning_orders);
}
}
4 changes: 2 additions & 2 deletions crates/autopilot/src/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use {

pub struct RunLoop {
orderbook: infra::shadow::Orderbook,
drivers: Vec<infra::Driver>,
drivers: Vec<Arc<infra::Driver>>,
trusted_tokens: AutoUpdatingTokenList,
auction: domain::auction::Id,
block: u64,
Expand All @@ -50,7 +50,7 @@ impl RunLoop {
#[allow(clippy::too_many_arguments)]
pub fn new(
orderbook: infra::shadow::Orderbook,
drivers: Vec<infra::Driver>,
drivers: Vec<Arc<infra::Driver>>,
trusted_tokens: AutoUpdatingTokenList,
solve_deadline: Duration,
liveness: Arc<Liveness>,
Expand Down

0 comments on commit b020020

Please sign in to comment.