Skip to content

Commit

Permalink
Spawn GC task from RecentBlockCache constructor (#2109)
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinquaXD authored Dec 1, 2023
1 parent 8fbe775 commit 0c1fcb0
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 176 deletions.
6 changes: 1 addition & 5 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,7 @@ pub async fn run(args: Arguments) {
block_retriever.clone(),
skip_event_sync_start,
));
let mut maintainers: Vec<Arc<dyn Maintaining>> =
vec![pool_fetcher.clone(), event_updater, Arc::new(db.clone())];
let mut maintainers: Vec<Arc<dyn Maintaining>> = vec![event_updater, Arc::new(db.clone())];

let gas_price_estimator = Arc::new(InstrumentedGasEstimator::new(
shared::gas_price_estimation::create_priority_estimator(
Expand Down Expand Up @@ -538,9 +537,6 @@ pub async fn run(args: Arguments) {
);
maintainers.push(broadcaster_event_updater);
}
if let Some(balancer) = balancer_pool_fetcher {
maintainers.push(balancer);
}
if let Some(uniswap_v3) = uniswap_v3_pool_fetcher {
maintainers.push(uniswap_v3);
}
Expand Down
48 changes: 4 additions & 44 deletions crates/driver/src/boundary/liquidity/uniswap/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,9 @@ use {
},
async_trait::async_trait,
contracts::{GPv2Settlement, IUniswapLikeRouter},
ethrpc::{
current_block::{self, CurrentBlockStream},
Web3,
},
futures::StreamExt,
ethrpc::{current_block::CurrentBlockStream, Web3},
shared::{
http_solver::model::TokenAmount,
maintenance::Maintaining,
sources::uniswap_v2::{
pair_provider::PairProvider,
pool_cache::PoolCache,
Expand All @@ -30,9 +25,8 @@ use {
},
std::{
collections::HashSet,
sync::{self, Arc, Mutex},
sync::{Arc, Mutex},
},
tracing::Instrument,
};

/// Median gas used per UniswapInteraction (v2).
Expand Down Expand Up @@ -150,18 +144,11 @@ where
config.missing_pool_cache_time,
);

let pool_cache = Arc::new(PoolCache::new(
Arc::new(PoolCache::new(
boundary::liquidity::cache_config(),
Arc::new(pool_fetcher),
blocks.clone(),
)?);

tokio::task::spawn(
cache_update(blocks.clone(), Arc::downgrade(&pool_cache))
.instrument(tracing::info_span!("uniswap_v2_cache")),
);

pool_cache
)?)
};

Ok(Box::new(UniswapLikeLiquidity::with_allowances(
Expand All @@ -172,33 +159,6 @@ where
)))
}

async fn cache_update(blocks: CurrentBlockStream, pool_cache: sync::Weak<PoolCache>) {
let mut blocks = current_block::into_stream(blocks);
loop {
let block = blocks
.next()
.await
.expect("block stream unexpectedly ended")
.number;

let pool_cache = match pool_cache.upgrade() {
Some(value) => value,
None => {
tracing::debug!("pool cache dropped; stopping update task");
break;
}
};

tracing::info_span!("maintenance", block)
.in_scope(|| async move {
if let Err(err) = pool_cache.run_maintenance().await {
tracing::warn!(?err, "error updating pool cache");
}
})
.await;
}
}

/// An allowance manager that always reports no allowances.
struct NoAllowanceManaging;

Expand Down
12 changes: 3 additions & 9 deletions crates/orderbook/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use {
fee_subsidy::{config::FeeSubsidyConfiguration, FeeSubsidizing},
gas_price::InstrumentedGasEstimator,
http_client::HttpClientFactory,
maintenance::{Maintaining, ServiceMaintenance},
maintenance::ServiceMaintenance,
metrics::{serve_metrics, DEFAULT_METRICS_PORT},
network::network_name,
oneinch_api::OneInchClientImpl,
Expand Down Expand Up @@ -518,12 +518,9 @@ pub async fn run(args: Arguments) {
ipfs,
));

let mut maintainers = vec![pool_fetcher as Arc<dyn Maintaining>];
if let Some(balancer) = balancer_pool_fetcher {
maintainers.push(balancer);
}
if let Some(uniswap_v3) = uniswap_v3_pool_fetcher {
maintainers.push(uniswap_v3);
let service_maintainer = ServiceMaintenance::new(vec![uniswap_v3]);
task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream));
}

check_database_connection(orderbook.as_ref()).await;
Expand All @@ -548,9 +545,6 @@ pub async fn run(args: Arguments) {
native_price_estimator,
);

let service_maintainer = ServiceMaintenance::new(maintainers);
task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream));

let mut metrics_address = args.bind_address;
metrics_address.set_port(DEFAULT_METRICS_PORT);
tracing::info!(%metrics_address, "serving metrics");
Expand Down
80 changes: 64 additions & 16 deletions crates/shared/src/recent_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use {
cached::{Cached, SizedCache},
ethcontract::BlockNumber,
ethrpc::current_block::CurrentBlockStream,
futures::FutureExt,
futures::{FutureExt, StreamExt},
itertools::Itertools,
prometheus::IntCounterVec,
std::{
Expand All @@ -41,6 +41,7 @@ use {
sync::{Arc, Mutex},
time::Duration,
},
tracing::Instrument,
};

/// How many liqudity sources should at most be fetched in a single chunk.
Expand Down Expand Up @@ -87,14 +88,21 @@ impl From<Block> for BlockNumber {
/// updates the N most recently used entries automatically when a new block
/// arrives.
pub struct RecentBlockCache<K, V, F>
where
K: CacheKey<V>,
F: CacheFetching<K, V>,
{
inner: Arc<Inner<K, V, F>>,
}

pub struct Inner<K, V, F>
where
K: CacheKey<V>,
F: CacheFetching<K, V>,
{
mutexed: Mutex<Mutexed<K, V>>,
number_of_blocks_to_cache: NonZeroU64,
fetcher: Arc<F>,
block_stream: CurrentBlockStream,
maximum_retries: u32,
delay_between_retries: Duration,
metrics: &'static Metrics,
Expand Down Expand Up @@ -133,7 +141,6 @@ struct Metrics {
#[metric(labels("cache_type"))]
recent_block_cache_misses: IntCounterVec,
}

impl<K, V, F> RecentBlockCache<K, V, F>
where
K: CacheKey<V>,
Expand All @@ -158,28 +165,63 @@ where
metrics_label: &'static str,
) -> Result<Self> {
let block = block_stream.borrow().number;
Ok(Self {
let inner = Arc::new(Inner {
mutexed: Mutex::new(Mutexed::new(
config.number_of_entries_to_auto_update,
block,
config.maximum_recent_block_age,
)),
number_of_blocks_to_cache: config.number_of_blocks_to_cache,
fetcher: Arc::new(fetcher),
block_stream,
maximum_retries: config.max_retries,
delay_between_retries: config.delay_between_retries,
metrics: Metrics::instance(observe::metrics::get_storage_registry()).unwrap(),
metrics_label,
requests: BoxRequestSharing::labelled("liquidity_fetching".into()),
})
});

Self::spawn_gc_task(
Arc::downgrade(&inner),
block_stream,
metrics_label.to_string(),
);

Ok(Self { inner })
}

pub async fn fetch(&self, keys: impl IntoIterator<Item = K>, block: Block) -> Result<Vec<V>> {
self.inner.fetch(keys, block).await
}

pub async fn update_cache(&self) -> Result<()> {
let new_block = self.block_stream.borrow().number;
self.update_cache_at_block(new_block).await
fn spawn_gc_task(
inner: std::sync::Weak<Inner<K, V, F>>,
block_stream: CurrentBlockStream,
label: String,
) {
tokio::task::spawn(
async move {
let mut stream = ethrpc::current_block::into_stream(block_stream);
while let Some(block) = stream.next().await {
let Some(inner) = inner.upgrade() else {
tracing::debug!("cache no longer in use; terminate GC task");
break;
};
if let Err(err) = inner.update_cache_at_block(block.number).await {
tracing::warn!(?err, "failed to update cache");
}
}
}
.instrument(tracing::info_span!("cache_maintenance", cache = label)),
);
}
}

impl<K, V, F> Inner<K, V, F>
where
K: CacheKey<V>,
V: Clone + Send + Sync + 'static,
F: CacheFetching<K, V>,
{
async fn update_cache_at_block(&self, new_block: u64) -> Result<()> {
let keys = self
.mutexed
Expand Down Expand Up @@ -239,7 +281,7 @@ where
fut.await.context("could not fetch liquidity")
}

pub async fn fetch(&self, keys: impl IntoIterator<Item = K>, block: Block) -> Result<Vec<V>> {
async fn fetch(&self, keys: impl IntoIterator<Item = K>, block: Block) -> Result<Vec<V>> {
let block = match block {
Block::Recent => None,
Block::Number(number) => Some(number),
Expand Down Expand Up @@ -509,7 +551,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;

let assert_keys_recently_used = |expected_keys: &[usize]| {
let cached_keys = cache
Expand Down Expand Up @@ -565,7 +608,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;

// Initial state on the block chain.
let initial_values = vec![
Expand Down Expand Up @@ -626,7 +670,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;

let value0 = TestValue::new(0, "0");
let value1 = TestValue::new(1, "1");
Expand Down Expand Up @@ -683,7 +728,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;

// cache at block 5
*values.lock().unwrap() = vec![TestValue::new(0, "foo")];
Expand Down Expand Up @@ -750,7 +796,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;

// Fetch 10 keys on block 10; but we only have capacity to update 2 of those in
// the background.
Expand Down Expand Up @@ -798,7 +845,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;
let key = TestKey(0);

// cache at block 7, most recent block is 10.
Expand Down
16 changes: 0 additions & 16 deletions crates/shared/src/sources/balancer_v2/pool_fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use {
},
crate::{
ethrpc::{Web3, Web3Transport},
maintenance::Maintaining,
recent_block_cache::{Block, CacheConfig},
token_info::TokenInfoFetching,
},
Expand Down Expand Up @@ -357,17 +356,6 @@ impl BalancerPoolFetching for BalancerPoolFetcher {
}
}

#[async_trait::async_trait]
impl Maintaining for BalancerPoolFetcher {
async fn run_maintenance(&self) -> Result<()> {
self.fetcher.run_maintenance().await
}

fn name(&self) -> &str {
"BalancerPoolFetcher"
}
}

/// Creates an aggregate fetcher for all supported pool factories.
async fn create_aggregate_pool_fetcher(
web3: Web3,
Expand Down Expand Up @@ -557,7 +545,6 @@ mod tests {
)
.await
.unwrap();
pool_fetcher.run_maintenance().await.unwrap();
let pair = TokenPair::new(
addr!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"),
addr!("C011a73ee8576Fb46F5E1c5751cA3B9Fe0af2a6F"),
Expand Down Expand Up @@ -604,9 +591,6 @@ mod tests {
pool_id_deny_list: Default::default(),
};

// index all the pools.
pool_fetcher.run_maintenance().await.unwrap();

// see what the subgraph says.
let client = BalancerSubgraphClient::for_chain(chain_id, Client::new()).unwrap();
let subgraph_pools = client.get_registered_pools().await.unwrap();
Expand Down
Loading

0 comments on commit 0c1fcb0

Please sign in to comment.