Skip to content

Commit

Permalink
Use OnceCell to manage liquidity source initialization (#2055)
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinquaXD authored Nov 15, 2023
1 parent bd20530 commit 32852fe
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions crates/solver/src/liquidity_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use {
crate::liquidity::Liquidity,
anyhow::Result,
model::TokenPair,
once_cell::sync::OnceCell,
shared::{baseline_solver::BaseTokens, recent_block_cache::Block},
std::{collections::HashSet, future::Future, sync::Arc, time::Duration},
tokio::sync::Mutex,
tracing::Instrument,
};

Expand Down Expand Up @@ -51,7 +51,7 @@ impl LiquidityCollecting for LiquidityCollector {
/// succeeds. Until the liquidity source has been initialised no liquidity will
/// be provided.
pub struct BackgroundInitLiquiditySource<L> {
liquidity_source: Arc<Mutex<Option<L>>>,
liquidity_source: Arc<OnceCell<L>>,
}

impl<L> BackgroundInitLiquiditySource<L> {
Expand All @@ -67,7 +67,7 @@ impl<L> BackgroundInitLiquiditySource<L> {
.liquidity_enabled
.with_label_values(&[label])
.set(0);
let liquidity_source: Arc<Mutex<Option<L>>> = Default::default();
let liquidity_source = Arc::new(OnceCell::new());
let inner = liquidity_source.clone();
let inner_label = label.to_owned();
tokio::task::spawn(
Expand All @@ -82,12 +82,20 @@ impl<L> BackgroundInitLiquiditySource<L> {
tokio::time::sleep(retry_init_timeout).await;
}
Ok(source) => {
let _ = inner.lock().await.insert(source);
tracing::debug!("successfully initialised liquidity source");
Metrics::get()
.liquidity_enabled
.with_label_values(&[&inner_label])
.inc();
if inner.set(source).is_err() {
// should never happen but if it does we want to know about it ASAP
tracing::error!(
source = inner_label,
"liquidity source already initialized"
);
} else {
tracing::debug!("successfully initialised liquidity source");
Metrics::get()
.liquidity_enabled
.with_label_values(&[&inner_label])
.inc();
}

break;
}
}
Expand All @@ -110,15 +118,8 @@ where
pairs: HashSet<TokenPair>,
at_block: Block,
) -> Result<Vec<Liquidity>> {
// Use `try_lock` to not block caller when the lock is currently being held for
// a potentially very slow init logic.
let liquidity_source = match self.liquidity_source.try_lock() {
Ok(lock) => lock,
Err(_) => return Ok(vec![]),
};

match &*liquidity_source {
Some(initialised_source) => initialised_source.get_liquidity(pairs, at_block).await,
match self.liquidity_source.get() {
Some(source) => source.get_liquidity(pairs, at_block).await,
None => Ok(vec![]),
}
}
Expand Down

0 comments on commit 32852fe

Please sign in to comment.