Skip to content

Commit

Permalink
dex: redesign value circuit breaker
Browse files Browse the repository at this point in the history
Closes #4025

This commit sketches a new mechanism but does not fix the existing tests; it
needs to be picked up and pushed over the finish line. (We should be testing
this).
  • Loading branch information
hdevalence authored and zbuc committed Mar 22, 2024
1 parent 97c86a2 commit 2d4db73
Show file tree
Hide file tree
Showing 18 changed files with 146 additions and 111 deletions.
5 changes: 0 additions & 5 deletions crates/core/component/dex/src/circuit_breaker/mod.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use cnidarium_component::ActionHandler;
use penumbra_proto::StateWriteProto as _;

use crate::{
component::{PositionManager, PositionRead},
component::{PositionManager, PositionRead, ValueCircuitBreaker},
event,
lp::{action::PositionOpen, position},
};
Expand Down Expand Up @@ -33,6 +33,13 @@ impl ActionHandler for PositionOpen {
async fn check_and_execute<S: StateWrite>(&self, mut state: S) -> Result<()> {
// Validate that the position ID doesn't collide
state.check_position_id_unused(&self.position.id()).await?;

// Credit the DEX for the inflows from this position.
// TODO: in a future PR, split current PositionManager to PositionManagerInner
// and fold this into a position open method
state.vcb_credit(self.position.reserves_1()).await?;
state.vcb_credit(self.position.reserves_2()).await?;

state.put_position(self.position.clone()).await?;
state.record_proto(event::position_open(self));
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use decaf377::Fr;
use penumbra_proto::StateWriteProto;

use crate::{
component::{PositionManager, PositionRead},
component::{PositionManager, PositionRead, ValueCircuitBreaker},
event,
lp::{action::PositionWithdraw, position, Reserves},
};
Expand Down Expand Up @@ -90,6 +90,12 @@ impl ActionHandler for PositionWithdraw {
// the current reserves.
state.record_proto(event::position_withdraw(self, &metadata));

// Debit the DEX for the outflows from this position.
// TODO: in a future PR, split current PositionManager to PositionManagerInner
// and fold this into a position open method
state.vcb_debit(metadata.reserves_1()).await?;
state.vcb_debit(metadata.reserves_2()).await?;

// Finally, update the position. This has two steps:
// - update the state with the correct sequence number;
// - zero out the reserves, to prevent double-withdrawals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ impl ActionHandler for Swap {
swap_flow.1 += swap.body.delta_2_i;

// Set the batch swap flow for the trading pair.
state.put_swap_flow(&swap.body.trading_pair, swap_flow);
state
.put_swap_flow(&swap.body.trading_pair, swap_flow)
.await?;

// Record the swap commitment in the state.
let source = state.get_current_source().expect("source is set");
Expand Down
2 changes: 1 addition & 1 deletion crates/core/component/dex/src/component/arb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use penumbra_proto::StateWriteProto as _;
use penumbra_sct::component::clock::EpochRead;
use tracing::instrument;

use crate::{event, ExecutionCircuitBreaker, SwapExecution};
use crate::{component::ExecutionCircuitBreaker, event, SwapExecution};

use super::{
router::{RouteAndFill, RoutingParams},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod execution;
mod value;

pub use execution::ExecutionCircuitBreaker;
pub use value::ValueCircuitBreaker;
Original file line number Diff line number Diff line change
@@ -1,42 +1,46 @@
use penumbra_asset::{asset::Id, Balance, Value};
use anyhow::{anyhow, Result};
use cnidarium::StateWrite;
use penumbra_asset::Value;
use penumbra_num::Amount;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ValueCircuitBreaker {
balance: Balance,
}

impl ValueCircuitBreaker {
pub fn tally(&mut self, balance: Balance) {
self.balance += balance;
}

pub fn check(&self) -> anyhow::Result<()> {
// No assets should ever be "required" by the circuit breaker's
// internal balance tracking, only "provided".
if let Some(r) = self.balance.required().next() {
return Err(anyhow::anyhow!(
"balance for asset {} is negative: -{}",
r.asset_id,
r.amount
));
}

use penumbra_proto::{StateReadProto, StateWriteProto};
use tonic::async_trait;

use crate::state_key;

/// Tracks the aggregate value of deposits in the DEX.
#[async_trait]
pub trait ValueCircuitBreaker: StateWrite {
/// Credits a deposit into the DEX.
async fn vcb_credit(&mut self, value: Value) -> Result<()> {
let balance: Amount = self
.get(&state_key::value_balance(&value.asset_id))
.await?
.unwrap_or_default();
let new_balance = balance
.checked_add(&value.amount)
.ok_or_else(|| anyhow!("overflowed balance while crediting value circuit breaker"))?;
self.put(state_key::value_balance(&value.asset_id), new_balance);
Ok(())
}

pub fn available(&self, asset_id: Id) -> Value {
self.balance
.provided()
.find(|b| b.asset_id == asset_id)
.unwrap_or(Value {
asset_id,
amount: Amount::from(0u64),
})
/// Debits a deposit from the DEX.
async fn vcb_debit(&mut self, value: Value) -> Result<()> {
let balance: Amount = self
.get(&state_key::value_balance(&value.asset_id))
.await?
.unwrap_or_default();
let new_balance = balance
.checked_sub(&value.amount)
.ok_or_else(|| anyhow!("underflowed balance while debiting value circuit breaker"))?;
self.put(state_key::value_balance(&value.asset_id), new_balance);
Ok(())
}
}

impl<T: StateWrite + ?Sized> ValueCircuitBreaker for T {}

/*
#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -248,3 +252,4 @@ mod tests {
.expect("unable to process batch swaps");
}
}
*/
47 changes: 42 additions & 5 deletions crates/core/component/dex/src/component/dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{

use super::{
router::{HandleBatchSwaps, RoutingParams},
Arbitrage, PositionManager,
Arbitrage, PositionManager, ValueCircuitBreaker,
};

pub struct Dex {}
Expand Down Expand Up @@ -209,12 +209,26 @@ pub trait StateWriteExt: StateWrite + StateReadExt {
self.object_put(state_key::config::dex_params_updated(), ())
}

fn set_output_data(
async fn set_output_data(
&mut self,
output_data: BatchSwapOutputData,
swap_execution_1_for_2: Option<SwapExecution>,
swap_execution_2_for_1: Option<SwapExecution>,
) {
) -> Result<()> {
// Debit the DEX for the swap outflows.
// Note that since we credited the DEX for _all_ inflows, we need to debit the
// unfilled amounts as well as the filled amounts.
self.vcb_debit(Value {
amount: output_data.unfilled_1 + output_data.lambda_1,
asset_id: output_data.trading_pair.asset_1,
})
.await?;
self.vcb_debit(Value {
amount: output_data.unfilled_2 + output_data.lambda_2,
asset_id: output_data.trading_pair.asset_2,
})
.await?;

// Write the output data to the state under a known key, for querying, ...
let height = output_data.height;
let trading_pair = output_data.trading_pair;
Expand Down Expand Up @@ -247,17 +261,40 @@ pub trait StateWriteExt: StateWrite + StateReadExt {
swap_execution_1_for_2,
swap_execution_2_for_1,
));

Ok(())
}

fn set_arb_execution(&mut self, height: u64, execution: SwapExecution) {
self.put(state_key::arb_execution(height), execution);
}

fn put_swap_flow(&mut self, trading_pair: &TradingPair, swap_flow: SwapFlow) {
async fn put_swap_flow(
&mut self,
trading_pair: &TradingPair,
swap_flow: SwapFlow,
) -> Result<()> {
// Credit the DEX for the swap inflows.
//
// Note that we credit the DEX for _all_ inflows, since we don't know
// how much will eventually be filled.
self.vcb_credit(Value {
amount: swap_flow.0,
asset_id: trading_pair.asset_1,
})
.await?;
self.vcb_credit(Value {
amount: swap_flow.1,
asset_id: trading_pair.asset_2,
})
.await?;

// TODO: replace with IM struct later
let mut swap_flows = self.swap_flows();
swap_flows.insert(*trading_pair, swap_flow);
self.object_put(state_key::swap_flows(), swap_flows)
self.object_put(state_key::swap_flows(), swap_flows);

Ok(())
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/core/component/dex/src/component/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ pub mod router;

mod action_handler;
mod arb;
pub(crate) mod circuit_breaker;
mod dex;
mod flow;
pub(crate) mod position_manager;
mod swap_manager;

pub use self::metrics::register_metrics;
pub use arb::Arbitrage;
pub use circuit_breaker::ExecutionCircuitBreaker;
pub(crate) use circuit_breaker::ValueCircuitBreaker;
pub use dex::{Dex, StateReadExt, StateWriteExt};
pub use position_manager::{PositionManager, PositionRead};
pub use swap_manager::SwapManager;
Expand Down
27 changes: 0 additions & 27 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use penumbra_num::Amount;
use penumbra_proto::DomainType;
use penumbra_proto::{StateReadProto, StateWriteProto};

use crate::circuit_breaker::ValueCircuitBreaker;
use crate::lp::position::State;
use crate::{
lp::position::{self, Position},
Expand Down Expand Up @@ -570,32 +569,6 @@ pub(crate) trait Inner: StateWrite {
"updating position assets' aggregate balances"
);

let mut value_circuit_breaker: ValueCircuitBreaker = match self
.nonverifiable_get_raw(state_key::aggregate_value().as_bytes())
.await
.expect("able to retrieve value circuit breaker from nonverifiable storage")
{
Some(bytes) => serde_json::from_slice(&bytes).expect(
"able to deserialize stored value circuit breaker from nonverifiable storage",
),
None => ValueCircuitBreaker::default(),
};

// Add the change to the value circuit breaker for assets A and B.
value_circuit_breaker.tally(net_change_for_a);
value_circuit_breaker.tally(net_change_for_b);

// Confirm that the value circuit breaker is still within the limits.
// This call will panic if the value circuit breaker detects inflation.
value_circuit_breaker.check()?;

// Store the value circuit breaker back to nonconsensus storage with the updated tallies.
self.nonverifiable_put_raw(
state_key::aggregate_value().as_bytes().to_vec(),
serde_json::to_vec(&value_circuit_breaker)
.expect("able to serialize value circuit breaker for nonverifiable storage"),
);

Ok(())
}
}
Expand Down
34 changes: 4 additions & 30 deletions crates/core/component/dex/src/component/router/route_and_fill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use penumbra_num::Amount;
use tracing::instrument;

use crate::{
circuit_breaker::ValueCircuitBreaker,
component::{
flow::SwapFlow,
router::{FillRoute, PathSearch, RoutingParams},
PositionManager, StateWriteExt,
ExecutionCircuitBreaker, PositionManager, StateWriteExt,
},
lp::position::MAX_RESERVE_AMOUNT,
state_key, BatchSwapOutputData, ExecutionCircuitBreaker, SwapExecution, TradingPair,
BatchSwapOutputData, SwapExecution, TradingPair,
};

use super::fill_route::FillError;
Expand Down Expand Up @@ -49,19 +48,6 @@ pub trait HandleBatchSwaps: StateWrite + Sized {
tracing::debug!(?delta_1, ?delta_2, ?trading_pair, "decrypted batch swaps");

let execution_circuit_breaker = ExecutionCircuitBreaker::default();
// Fetch the ValueCircuitBreaker prior to calling `route_and_fill`, so
// we know the total aggregate amount of each asset prior to executing and
// can ensure the total outflows don't exceed the total balances.
let value_circuit_breaker: ValueCircuitBreaker = match self
.nonverifiable_get_raw(state_key::aggregate_value().as_bytes())
.await
.expect("able to retrieve value circuit breaker from nonverifiable storage")
{
Some(bytes) => serde_json::from_slice(&bytes).expect(
"able to deserialize stored value circuit breaker from nonverifiable storage",
),
None => ValueCircuitBreaker::default(),
};

let swap_execution_1_for_2 = if delta_1.value() > 0 {
Some(
Expand Down Expand Up @@ -121,19 +107,6 @@ pub trait HandleBatchSwaps: StateWrite + Sized {
unfilled_2,
};

// Check that the output data doesn't exceed the ValueCircuitBreaker's quantities
// (i.e. we didn't outflow more value than existed within liquidity positions).
let available_asset_1 = value_circuit_breaker.available(trading_pair.asset_1());
let available_asset_2 = value_circuit_breaker.available(trading_pair.asset_2());
assert!(
output_data.lambda_1 <= available_asset_1.amount,
"asset 1 outflow exceeds available balance"
);
assert!(
output_data.lambda_2 <= available_asset_2.amount,
"asset 2 outflow exceeds available balance"
);

// Fetch the swap execution object that should have been modified during the routing and filling.
tracing::debug!(
?output_data,
Expand All @@ -142,7 +115,8 @@ pub trait HandleBatchSwaps: StateWrite + Sized {
);
Arc::get_mut(self)
.expect("expected state to have no other refs")
.set_output_data(output_data, swap_execution_1_for_2, swap_execution_2_for_1);
.set_output_data(output_data, swap_execution_1_for_2, swap_execution_2_for_1)
.await?;

Ok(())
}
Expand Down
8 changes: 6 additions & 2 deletions crates/core/component/dex/src/component/router/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,9 @@ async fn best_position_route_and_fill() -> anyhow::Result<()> {
// Set the batch swap flow for the trading pair.
Arc::get_mut(&mut state)
.unwrap()
.put_swap_flow(&trading_pair, swap_flow.clone());
.put_swap_flow(&trading_pair, swap_flow.clone())
.await
.unwrap();
let routing_params = state.routing_params().await.unwrap();
state
.handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), 0, routing_params)
Expand Down Expand Up @@ -1154,7 +1156,9 @@ async fn multi_hop_route_and_fill() -> anyhow::Result<()> {
// Set the batch swap flow for the trading pair.
Arc::get_mut(&mut state)
.unwrap()
.put_swap_flow(&trading_pair, swap_flow.clone());
.put_swap_flow(&trading_pair, swap_flow.clone())
.await
.unwrap();
let routing_params = state.routing_params().await.unwrap();
state
.handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), 0, routing_params)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/component/dex/src/component/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use penumbra_proto::{
DomainType, StateReadProto,
};

use crate::ExecutionCircuitBreaker;
use super::ExecutionCircuitBreaker;
use crate::{
lp::position::{self, Position},
state_key, DirectedTradingPair, SwapExecution, TradingPair,
Expand Down
Loading

0 comments on commit 2d4db73

Please sign in to comment.