Skip to content

Commit

Permalink
dex: Extend arbitrage routing (#4292)
Browse files Browse the repository at this point in the history
## Describe your changes

This PR extends arbitrage routing to route against assets involved in
swaps and positions opened during the current block.

## Issue ticket number and link

#4177 

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label.
  • Loading branch information
zbuc authored May 14, 2024
1 parent 3fe73eb commit 8ed2ad7
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 9 deletions.
14 changes: 13 additions & 1 deletion crates/core/component/dex/src/component/action_handler/swap.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use anyhow::{ensure, Result};
use async_trait::async_trait;
use cnidarium::StateWrite;
Expand All @@ -7,7 +9,9 @@ use penumbra_proto::StateWriteProto;
use penumbra_sct::component::source::SourceContext;

use crate::{
component::{metrics, StateReadExt, StateWriteExt, SwapManager},
component::{
metrics, position_manager::PositionManager as _, StateReadExt, StateWriteExt, SwapManager,
},
event,
swap::{proof::SwapProofPublic, Swap},
};
Expand Down Expand Up @@ -65,6 +69,14 @@ impl ActionHandler for Swap {
.add_swap_payload(self.body.payload.clone(), source)
.await;

// Mark the assets for the swap's trading pair as accessed during this block.
let fixed_candidates = Arc::new(dex_params.fixed_candidates.clone());
state.add_recently_accessed_asset(
swap.body.trading_pair.asset_1(),
fixed_candidates.clone(),
);
state.add_recently_accessed_asset(swap.body.trading_pair.asset_2(), fixed_candidates);

metrics::histogram!(crate::component::metrics::DEX_SWAP_DURATION)
.record(swap_start.elapsed());
state.record_proto(event::swap(self));
Expand Down
20 changes: 15 additions & 5 deletions crates/core/component/dex/src/component/dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{

use super::{
router::{HandleBatchSwaps, RoutingParams},
Arbitrage, PositionManager, ValueCircuitBreaker,
Arbitrage, PositionManager, PositionRead as _, ValueCircuitBreaker,
};

pub struct Dex {}
Expand Down Expand Up @@ -83,12 +83,22 @@ impl Component for Dex {
// For arbitrage, we extend the path search by 2 hops to allow a path out of the
// staking token and back.

// TODO: Build an extended candidate set with:
// - both ends of all trading pairs for which there were swaps in the block
// - both ends of all trading pairs for which positions were opened
// Extend the fixed candidate set to include recently accessed assets, to have
// more arbitrage execution against newly opened positions.
let fixed_candidates = Arc::new(
routing_params
.fixed_candidates
.iter()
.cloned()
// The set of recently accessed assets is already limited to avoid
// potentially blowing up routing time.
.chain(state.recently_accessed_assets().iter().cloned())
.collect::<Vec<_>>(),
);

let arb_routing_params = RoutingParams {
max_hops: routing_params.max_hops + 2,
fixed_candidates: routing_params.fixed_candidates.clone(),
fixed_candidates,
price_limit: Some(1u64.into()),
};

Expand Down
56 changes: 53 additions & 3 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ use penumbra_asset::{asset, Balance};
use penumbra_proto::DomainType;
use penumbra_proto::{StateReadProto, StateWriteProto};

use crate::component::position_manager::{
base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex,
price_index::PositionByPriceIndex,
use crate::component::{
dex::StateReadExt as _,
position_manager::{
base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex,
price_index::PositionByPriceIndex,
},
};
use crate::lp::Reserves;
use crate::{
Expand All @@ -26,6 +29,7 @@ use crate::{
use crate::{event, state_key};

const DYNAMIC_ASSET_LIMIT: usize = 10;
const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10;

mod base_liquidity_index;
mod counter;
Expand Down Expand Up @@ -142,6 +146,12 @@ pub trait PositionRead: StateRead {
})
.boxed()
}

/// Fetch the list of assets interacted with during this block.
fn recently_accessed_assets(&self) -> im::OrdSet<asset::Id> {
self.object_get(state_key::recently_accessed_assets())
.unwrap_or_default()
}
}
impl<T: StateRead + ?Sized> PositionRead for T {}

Expand Down Expand Up @@ -244,13 +254,53 @@ pub trait PositionManager: StateWrite + PositionRead {
self.vcb_credit(position.reserves_1()).await?;
self.vcb_credit(position.reserves_2()).await?;

// Add the asset IDs from the new position's trading pair
// to the candidate set for this block.
let routing_params = self.routing_params().await?;
self.add_recently_accessed_asset(
position.phi.pair.asset_1(),
routing_params.fixed_candidates.clone(),
);
self.add_recently_accessed_asset(
position.phi.pair.asset_2(),
routing_params.fixed_candidates,
);

// Finally, record the new position state.
self.record_proto(event::position_open(&position));
self.update_position(None, position).await?;

Ok(())
}

/// Adds an asset ID to the list of recently accessed assets,
/// making it a candidate for the current block's arbitrage routing.
///
/// This ensures that assets associated with recently active positions
/// will be eligible for arbitrage if mispriced positions are opened.
#[tracing::instrument(level = "debug", skip_all)]
fn add_recently_accessed_asset(
&mut self,
asset_id: asset::Id,
fixed_candidates: Arc<Vec<asset::Id>>,
) {
let mut assets = self.recently_accessed_assets();

// Limit the number of recently accessed assets to prevent blowing
// up routing time.
if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT {
return;
}

// If the asset is already in the fixed candidate list, don't insert it.
if fixed_candidates.contains(&asset_id) {
return;
}

assets.insert(asset_id);
self.object_put(state_key::recently_accessed_assets(), assets);
}

/// Record execution against an opened position.
///
/// IMPORTANT: This method can mutate its input state.
Expand Down
3 changes: 3 additions & 0 deletions crates/core/component/dex/src/component/router/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async fn path_search_basic() {
async fn path_extension_basic() {
let _ = tracing_subscriber::fmt::try_init();
let mut state = StateDelta::new(());
state.put_dex_params(DexParameters::default());

// Write some test positions with a mispriced gn:pusd pair.
create_test_positions_basic(&mut state, true).await;
Expand Down Expand Up @@ -131,6 +132,8 @@ async fn path_extension_basic() {
// Reset the state.
let mut state = StateDelta::new(());

state.put_dex_params(DexParameters::default());

// Write some test positions without the mispriced position.
create_test_positions_basic(&mut state, false).await;

Expand Down
4 changes: 4 additions & 0 deletions crates/core/component/dex/src/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub fn pending_position_closures() -> &'static str {
"dex/pending_position_closures"
}

pub fn recently_accessed_assets() -> &'static str {
"dex/recently_accessed_assets"
}

pub fn pending_payloads() -> &'static str {
"dex/pending_payloads"
}
Expand Down

0 comments on commit 8ed2ad7

Please sign in to comment.