Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dex: create liquidity indices #2843

Merged
merged 9 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/component/dex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ ark-serialize = "0.4"
ark-groth16 = {version = "0.4", default-features = false}
ark-snark = "0.4"
async-trait = "0.1.52"
async-stream = "0.2"
tokio = {version = "1.3", features = ["full"], optional = true}
hex = "0.4"
thiserror = "1"
Expand Down
225 changes: 214 additions & 11 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
use std::{collections::BTreeSet, iter::FromIterator, pin::Pin, sync::Arc};
use std::future;
use std::{pin::Pin, sync::Arc};

use anyhow::Result;
use async_stream::try_stream;
use async_trait::async_trait;
use futures::Stream;
use futures::StreamExt;
use penumbra_asset::asset;
use penumbra_num::Amount;
use penumbra_proto::DomainType;
use penumbra_proto::{StateReadProto, StateWriteProto};
use penumbra_storage::{EscapedByteSlice, StateRead, StateWrite};

use crate::lp::position::State;
use crate::{
lp::position::{self, Position},
state_key, DirectedTradingPair,
};

const DYNAMIC_ASSET_LIMIT: usize = 10;

#[async_trait]
pub trait PositionRead: StateRead {
/// Return a stream of all [`position::Metadata`] available.
Expand Down Expand Up @@ -138,12 +144,16 @@ pub trait PositionManager: StateWrite + PositionRead {
// reserves or the position state might have invalidated them.
self.deindex_position_by_price(&position);

let position = self.handle_limit_order(prev, position);
let position = self.handle_limit_order(&prev, position);

// Only index the position's liquidity if it is active.
if position.state == position::State::Opened {
self.index_position_by_price(&position);
}

// Update the available liquidity for this position's trading pair.
self.update_available_liquidity(&position, &prev).await?;

self.put(state_key::position_by_id(&id), position);
Ok(())
}
Expand All @@ -153,7 +163,7 @@ pub trait PositionManager: StateWrite + PositionRead {
/// not a limit order, or has not been filled, it is returned unchanged.
fn handle_limit_order(
&self,
prev_position: Option<position::Position>,
prev_position: &Option<position::Position>,
position: Position,
) -> Position {
let id = position.id();
Expand Down Expand Up @@ -187,16 +197,47 @@ pub trait PositionManager: StateWrite + PositionRead {
/// Combines a list of fixed candidates with a list of liquidity-based candidates.
/// This ensures that the fixed candidates are always considered, minimizing
/// the risk of attacks on routing.
async fn candidate_set(
fn candidate_set(
&self,
_from: asset::Id,
from: asset::Id,
fixed_candidates: Arc<Vec<asset::Id>>,
) -> Result<Vec<asset::Id>> {
let candidates = BTreeSet::from_iter(fixed_candidates.iter().cloned());
// TODO: do dynamic candidate selection based on liquidity (tracked by #2750)
// Idea: each block, compute the per-asset candidate set and store it
// in the object store as a BTreeMap.
Ok(candidates.into_iter().collect())
) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send>> {
// Clone the fixed candidates Arc so it can be moved into the stream filter's future.
let fc = fixed_candidates.clone();
let mut dynamic_candidates = self
.ordered_routable_assets(&from)
.filter(move |c| {
future::ready(!fc.contains(c.as_ref().expect("failed to fetch candidate")))
})
.take(DYNAMIC_ASSET_LIMIT);
try_stream! {
// First stream the fixed candidates, so those can be processed while the dynamic candidates are fetched.
for candidate in fixed_candidates.iter() {
yield candidate.clone();
}

// Yield the liquidity-based candidates. Note that this _may_ include some assets already included in the fixed set.
while let Some(candidate) = dynamic_candidates
.next().await {
zbuc marked this conversation as resolved.
Show resolved Hide resolved
yield candidate.expect("failed to fetch candidate");
}
}
.boxed()
}

/// Returns a stream of [`asset::Id`] routable from a given asset, ordered by liquidity.
fn ordered_routable_assets(
&self,
from: &asset::Id,
) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send + 'static>> {
let prefix = state_key::internal::routable_assets::prefix(from);
tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity");
self.nonverifiable_prefix_raw(&prefix)
.map(|entry| match entry {
Ok((_, v)) => Ok(asset::Id::decode(&*v)?),
Err(e) => Err(e),
})
.boxed()
}
}

Expand Down Expand Up @@ -252,5 +293,167 @@ pub(super) trait Inner: StateWrite {
self.nonverifiable_delete(state_key::internal::price_index::key(&pair12, &phi12, &id));
self.nonverifiable_delete(state_key::internal::price_index::key(&pair21, &phi21, &id));
}

/// Updates the nonverifiable liquidity indices given a [`Position`] in the direction specified by the [`DirectedTradingPair`].
/// An [`Option<Position>`] may be specified to allow for the case where a position is being updated.
async fn update_liquidity_index(
&mut self,
pair: DirectedTradingPair,
position: &Position,
prev: &Option<Position>,
) -> Result<()> {
tracing::debug!(?pair, "updating available liquidity indices");

let (new_a_from_b, current_a_from_b) = match (position.state, prev) {
(State::Opened, None) => {
// Add the new position's contribution to the index, no cancellation of the previous version necessary.

// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
bytes
.try_into()
.expect("liquidity index amount can always be parsed"),
)
})
.unwrap_or_default();

// Use the new reserves to compute `new_position_contribution`,
// the amount of asset A contributed by the position (i.e. the reserves of asset A).
let new_position_contribution = position
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Compute `new_A_from_B`.
let new_a_from_b =
// Add the contribution from the updated version.
current_a_from_b.saturating_add(&new_position_contribution);

tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, "newly opened position, adding contribution to existing available liquidity for trading pair");

(new_a_from_b, current_a_from_b)
}
(State::Opened, Some(prev)) => {
// Add the new position's contribution to the index, deleting the previous version's contribution.

// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
bytes
.try_into()
.expect("liquidity index amount can always be parsed"),
)
})
.unwrap_or_default();

// Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1).
let prev_position_contribution = prev
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Use the new reserves to compute `new_position_contribution`,
// the amount of asset A contributed by the position (i.e. the reserves of asset A).
let new_position_contribution = position
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Compute `new_A_from_B`.
let new_a_from_b =
// Subtract the previous version of the position's contribution to represent that position no longer
// being correct, and add the contribution from the updated version.
(current_a_from_b.saturating_sub(&prev_position_contribution)).saturating_add(&new_position_contribution);

tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, ?prev_position_contribution, "updated position, adding new contribution and subtracting previous contribution to existing available liquidity for trading pair");

(new_a_from_b, current_a_from_b)
}
(State::Closed, Some(prev)) => {
// Compute the previous contribution and erase it from the current index

// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
bytes
.try_into()
.expect("liquidity index amount can always be parsed"),
)
})
.unwrap_or_default();

// Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1).
let prev_position_contribution = prev
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Compute `new_A_from_B`.
let new_a_from_b =
// Subtract the previous version of the position's contribution to represent that position no longer
// being correct, and since the updated version is Closed, it has no contribution.
current_a_from_b.saturating_sub(&prev_position_contribution);

tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?prev_position_contribution, "closed position, subtracting previous contribution to existing available liquidity for trading pair");

(new_a_from_b, current_a_from_b)
}
(State::Withdrawn, _) | (State::Claimed, _) | (State::Closed, None) => {
// The position already went through the `Closed` state or was opened in the `Closed` state, so its contribution has already been subtracted.
return Ok(());
}
};

// Delete the existing key for this position if the reserve amount has changed.
if new_a_from_b != current_a_from_b {
self.nonverifiable_delete(
state_key::internal::routable_assets::key(&pair.start, current_a_from_b).to_vec(),
);
}

// Write the new key indicating that asset B is routable from asset A with `new_a_from_b` liquidity.
self.nonverifiable_put_raw(
state_key::internal::routable_assets::key(&pair.start, new_a_from_b).to_vec(),
pair.end.encode_to_vec(),
);
tracing::debug!(start = ?pair.start, end = ?pair.end, "marking routable from start -> end");

// Write the new lookup index storing `new_a_from_b` for this trading pair.
self.nonverifiable_put_raw(
state_key::internal::routable_assets::a_from_b(&pair).to_vec(),
new_a_from_b.to_be_bytes().to_vec(),
);
tracing::debug!(available_liquidity = ?new_a_from_b, ?pair, "marking available liquidity for trading pair");

Ok(())
}

async fn update_available_liquidity(
&mut self,
position: &Position,
prev_position: &Option<Position>,
) -> Result<()> {
// Since swaps may be performed in either direction, the available liquidity indices
// need to be calculated and stored for both the A -> B and B -> A directions.
let (a, b) = (position.phi.pair.asset_1(), position.phi.pair.asset_2());

// A -> B
self.update_liquidity_index(DirectedTradingPair::new(a, b), position, prev_position)
.await?;
// B -> A
self.update_liquidity_index(DirectedTradingPair::new(b, a), position, prev_position)
.await?;

Ok(())
}
}
impl<T: StateWrite + ?Sized> Inner for T {}
14 changes: 8 additions & 6 deletions crates/core/component/dex/src/component/router/path_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use futures::StreamExt;
use penumbra_asset::asset;
use penumbra_num::fixpoint::U128x128;
use penumbra_storage::{StateDelta, StateRead};
Expand Down Expand Up @@ -95,22 +96,23 @@ async fn relax_path<S: StateRead + 'static>(
mut path: Path<S>,
fixed_candidates: Arc<Vec<asset::Id>>,
) -> Result<()> {
let candidates = path
let mut candidates = path
.state
.candidate_set(*path.end(), fixed_candidates)
.instrument(path.span.clone())
.await?;
.instrument(path.span.clone());

path.span.in_scope(|| {
tracing::debug!(degree = ?candidates.len(), ?candidates, "relaxing path");
tracing::debug!("relaxing path");
});

let mut js = JoinSet::new();
for new_end in candidates {
// while let Some(new_end) = candidates {

while let Some(new_end) = candidates.inner_mut().next().await {
let new_path = path.fork();
let cache2 = cache.clone();
js.spawn(async move {
if let Some(new_path) = new_path.extend_to(new_end).await? {
if let Some(new_path) = new_path.extend_to(new_end?).await? {
cache2.lock().consider(new_path)
}
anyhow::Ok(())
Expand Down
3 changes: 2 additions & 1 deletion crates/core/component/dex/src/lp/trading_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl TradingFunction {
input: Value,
reserves: &Reserves,
) -> anyhow::Result<(Value, Reserves, Value)> {
tracing::debug!(?input, ?reserves, "filling trade");
if input.asset_id == self.pair.asset_1() {
let (unfilled, new_reserves, output) = self.component.fill(input.amount, reserves)?;
Ok((
Expand Down Expand Up @@ -412,7 +413,7 @@ impl BareTradingFunction {
price_ratio.checked_div(&self.gamma()).expect("gamma != 0")
}

/// Converts an amount `delta_1` into `lambda_2`, using the id effective price inverse.
/// Converts an amount `delta_1` into `lambda_2`, using the inverse of the effective price.
pub fn convert_to_lambda_2(&self, delta_1: U128x128) -> anyhow::Result<U128x128> {
let lambda_2 = self.effective_price_inv() * delta_1;
Ok(lambda_2?)
Expand Down
38 changes: 38 additions & 0 deletions crates/core/component/dex/src/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,44 @@ pub(crate) mod internal {
use super::*;
use crate::lp::BareTradingFunction;

/// Find assets with liquidity positions from asset `from`, ordered by price.
pub mod routable_assets {
use penumbra_asset::asset;
use penumbra_num::Amount;

use super::*;

/// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`.
/// `a_from_b` represents the amount of `A` that can be bought with `B`.
/// The prefix query includes only the `A` portion, meaning the keys will be returned in order of liquidity.
pub fn prefix(from: &asset::Id) -> [u8; 39] {
let mut key = [0u8; 39];
key[0..7].copy_from_slice(b"dex/ra/");
key[7..7 + 32].copy_from_slice(&from.to_bytes());
key
}

/// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`.
/// `a_from_b` represents the amount of `A` that can be bought with `B`.
pub fn key(from: &asset::Id, a_from_b: Amount) -> [u8; 55] {
let mut key = [0u8; 55];
key[0..7].copy_from_slice(b"dex/ra/");
key[7..32 + 7].copy_from_slice(&from.to_bytes());
key[32 + 7..32 + 7 + 16].copy_from_slice(&a_from_b.to_be_bytes());
key
}

/// `(A, B) => A_from_B` this will encode the current amount of `A` tradable into `B` for every directly routable trading pair.
/// This index can be used to determine the key values for the [`super::key`] ordered index to perform updates efficiently.
pub fn a_from_b(pair: &DirectedTradingPair) -> [u8; 71] {
let mut key = [0u8; 71];
key[0..7].copy_from_slice(b"dex/ab/");
key[7..7 + 32].copy_from_slice(&pair.start.to_bytes());
key[7 + 32..7 + 32 + 32].copy_from_slice(&pair.end.to_bytes());
key
}
}

pub mod price_index {
use super::*;

Expand Down
Loading