Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn GC task from RecentBlockCache constructor #2109

Merged
merged 3 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,7 @@ pub async fn run(args: Arguments) {
block_retriever.clone(),
skip_event_sync_start,
));
let mut maintainers: Vec<Arc<dyn Maintaining>> =
vec![pool_fetcher.clone(), event_updater, Arc::new(db.clone())];
let mut maintainers: Vec<Arc<dyn Maintaining>> = vec![event_updater, Arc::new(db.clone())];

let gas_price_estimator = Arc::new(InstrumentedGasEstimator::new(
shared::gas_price_estimation::create_priority_estimator(
Expand Down Expand Up @@ -538,9 +537,6 @@ pub async fn run(args: Arguments) {
);
maintainers.push(broadcaster_event_updater);
}
if let Some(balancer) = balancer_pool_fetcher {
maintainers.push(balancer);
}
if let Some(uniswap_v3) = uniswap_v3_pool_fetcher {
maintainers.push(uniswap_v3);
}
Expand Down
48 changes: 4 additions & 44 deletions crates/driver/src/boundary/liquidity/uniswap/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,9 @@ use {
},
async_trait::async_trait,
contracts::{GPv2Settlement, IUniswapLikeRouter},
ethrpc::{
current_block::{self, CurrentBlockStream},
Web3,
},
futures::StreamExt,
ethrpc::{current_block::CurrentBlockStream, Web3},
shared::{
http_solver::model::TokenAmount,
maintenance::Maintaining,
sources::uniswap_v2::{
pair_provider::PairProvider,
pool_cache::PoolCache,
Expand All @@ -30,9 +25,8 @@ use {
},
std::{
collections::HashSet,
sync::{self, Arc, Mutex},
sync::{Arc, Mutex},
},
tracing::Instrument,
};

/// Median gas used per UniswapInteraction (v2).
Expand Down Expand Up @@ -150,18 +144,11 @@ where
config.missing_pool_cache_time,
);

let pool_cache = Arc::new(PoolCache::new(
Arc::new(PoolCache::new(
boundary::liquidity::cache_config(),
Arc::new(pool_fetcher),
blocks.clone(),
)?);

tokio::task::spawn(
cache_update(blocks.clone(), Arc::downgrade(&pool_cache))
.instrument(tracing::info_span!("uniswap_v2_cache")),
);

pool_cache
)?)
};

Ok(Box::new(UniswapLikeLiquidity::with_allowances(
Expand All @@ -172,33 +159,6 @@ where
)))
}

async fn cache_update(blocks: CurrentBlockStream, pool_cache: sync::Weak<PoolCache>) {
let mut blocks = current_block::into_stream(blocks);
loop {
let block = blocks
.next()
.await
.expect("block stream unexpectedly ended")
.number;

let pool_cache = match pool_cache.upgrade() {
Some(value) => value,
None => {
tracing::debug!("pool cache dropped; stopping update task");
break;
}
};

tracing::info_span!("maintenance", block)
.in_scope(|| async move {
if let Err(err) = pool_cache.run_maintenance().await {
tracing::warn!(?err, "error updating pool cache");
}
})
.await;
}
}

/// An allowance manager that always reports no allowances.
struct NoAllowanceManaging;

Expand Down
12 changes: 3 additions & 9 deletions crates/orderbook/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use {
fee_subsidy::{config::FeeSubsidyConfiguration, FeeSubsidizing},
gas_price::InstrumentedGasEstimator,
http_client::HttpClientFactory,
maintenance::{Maintaining, ServiceMaintenance},
maintenance::ServiceMaintenance,
metrics::{serve_metrics, DEFAULT_METRICS_PORT},
network::network_name,
oneinch_api::OneInchClientImpl,
Expand Down Expand Up @@ -518,12 +518,9 @@ pub async fn run(args: Arguments) {
ipfs,
));

let mut maintainers = vec![pool_fetcher as Arc<dyn Maintaining>];
if let Some(balancer) = balancer_pool_fetcher {
maintainers.push(balancer);
}
if let Some(uniswap_v3) = uniswap_v3_pool_fetcher {
maintainers.push(uniswap_v3);
let service_maintainer = ServiceMaintenance::new(vec![uniswap_v3]);
task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream));
}

check_database_connection(orderbook.as_ref()).await;
Expand All @@ -548,9 +545,6 @@ pub async fn run(args: Arguments) {
native_price_estimator,
);

let service_maintainer = ServiceMaintenance::new(maintainers);
task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream));

let mut metrics_address = args.bind_address;
metrics_address.set_port(DEFAULT_METRICS_PORT);
tracing::info!(%metrics_address, "serving metrics");
Expand Down
72 changes: 56 additions & 16 deletions crates/shared/src/recent_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use {
cached::{Cached, SizedCache},
ethcontract::BlockNumber,
ethrpc::current_block::CurrentBlockStream,
futures::FutureExt,
futures::{FutureExt, StreamExt},
itertools::Itertools,
prometheus::IntCounterVec,
std::{
Expand All @@ -41,6 +41,7 @@ use {
sync::{Arc, Mutex},
time::Duration,
},
tracing::Instrument,
};

/// How many liqudity sources should at most be fetched in a single chunk.
Expand Down Expand Up @@ -87,14 +88,21 @@ impl From<Block> for BlockNumber {
/// updates the N most recently used entries automatically when a new block
/// arrives.
pub struct RecentBlockCache<K, V, F>
where
K: CacheKey<V>,
F: CacheFetching<K, V>,
{
inner: Arc<Inner<K, V, F>>,
}

pub struct Inner<K, V, F>
where
K: CacheKey<V>,
F: CacheFetching<K, V>,
{
mutexed: Mutex<Mutexed<K, V>>,
number_of_blocks_to_cache: NonZeroU64,
fetcher: Arc<F>,
block_stream: CurrentBlockStream,
maximum_retries: u32,
delay_between_retries: Duration,
metrics: &'static Metrics,
Expand Down Expand Up @@ -133,7 +141,6 @@ struct Metrics {
#[metric(labels("cache_type"))]
recent_block_cache_misses: IntCounterVec,
}

impl<K, V, F> RecentBlockCache<K, V, F>
where
K: CacheKey<V>,
Expand All @@ -158,28 +165,55 @@ where
metrics_label: &'static str,
) -> Result<Self> {
let block = block_stream.borrow().number;
Ok(Self {
let inner = Arc::new(Inner {
mutexed: Mutex::new(Mutexed::new(
config.number_of_entries_to_auto_update,
block,
config.maximum_recent_block_age,
)),
number_of_blocks_to_cache: config.number_of_blocks_to_cache,
fetcher: Arc::new(fetcher),
block_stream,
maximum_retries: config.max_retries,
delay_between_retries: config.delay_between_retries,
metrics: Metrics::instance(observe::metrics::get_storage_registry()).unwrap(),
metrics_label,
requests: BoxRequestSharing::labelled("liquidity_fetching".into()),
})
});

let inner_cloned = Arc::downgrade(&inner);
tokio::task::spawn(
async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nanonit: given the size and branch complexity of this nested task could be nice to factor it into a free method and keep new more contained.

let mut stream = ethrpc::current_block::into_stream(block_stream);
while let Some(block) = stream.next().await {
let Some(inner) = inner_cloned.upgrade() else {
tracing::debug!("cache no longer in use; terminate GC task");
break;
};
if let Err(err) = inner.update_cache_at_block(block.number).await {
tracing::warn!(?err, "filed to update cache");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::warn!(?err, "filed to update cache");
tracing::warn!(?err, "failed to update cache");

}
}
}
.instrument(tracing::info_span!(
"cache_maintenance",
cache = metrics_label
)),
);

Ok(Self { inner })
}

pub async fn update_cache(&self) -> Result<()> {
let new_block = self.block_stream.borrow().number;
self.update_cache_at_block(new_block).await
pub async fn fetch(&self, keys: impl IntoIterator<Item = K>, block: Block) -> Result<Vec<V>> {
self.inner.fetch(keys, block).await
}
}

impl<K, V, F> Inner<K, V, F>
where
K: CacheKey<V>,
V: Clone + Send + Sync + 'static,
F: CacheFetching<K, V>,
{
async fn update_cache_at_block(&self, new_block: u64) -> Result<()> {
let keys = self
.mutexed
Expand Down Expand Up @@ -239,7 +273,7 @@ where
fut.await.context("could not fetch liquidity")
}

pub async fn fetch(&self, keys: impl IntoIterator<Item = K>, block: Block) -> Result<Vec<V>> {
async fn fetch(&self, keys: impl IntoIterator<Item = K>, block: Block) -> Result<Vec<V>> {
let block = match block {
Block::Recent => None,
Block::Number(number) => Some(number),
Expand Down Expand Up @@ -509,7 +543,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;

let assert_keys_recently_used = |expected_keys: &[usize]| {
let cached_keys = cache
Expand Down Expand Up @@ -565,7 +600,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;

// Initial state on the block chain.
let initial_values = vec![
Expand Down Expand Up @@ -626,7 +662,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;

let value0 = TestValue::new(0, "0");
let value1 = TestValue::new(1, "1");
Expand Down Expand Up @@ -683,7 +720,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;

// cache at block 5
*values.lock().unwrap() = vec![TestValue::new(0, "foo")];
Expand Down Expand Up @@ -750,7 +788,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;

// Fetch 10 keys on block 10; but we only have capacity to update 2 of those in
// the background.
Expand Down Expand Up @@ -798,7 +837,8 @@ mod tests {
block_stream,
"",
)
.unwrap();
.unwrap()
.inner;
let key = TestKey(0);

// cache at block 7, most recent block is 10.
Expand Down
16 changes: 0 additions & 16 deletions crates/shared/src/sources/balancer_v2/pool_fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use {
},
crate::{
ethrpc::{Web3, Web3Transport},
maintenance::Maintaining,
recent_block_cache::{Block, CacheConfig},
token_info::TokenInfoFetching,
},
Expand Down Expand Up @@ -357,17 +356,6 @@ impl BalancerPoolFetching for BalancerPoolFetcher {
}
}

#[async_trait::async_trait]
impl Maintaining for BalancerPoolFetcher {
async fn run_maintenance(&self) -> Result<()> {
self.fetcher.run_maintenance().await
}

fn name(&self) -> &str {
"BalancerPoolFetcher"
}
}

/// Creates an aggregate fetcher for all supported pool factories.
async fn create_aggregate_pool_fetcher(
web3: Web3,
Expand Down Expand Up @@ -557,7 +545,6 @@ mod tests {
)
.await
.unwrap();
pool_fetcher.run_maintenance().await.unwrap();
let pair = TokenPair::new(
addr!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"),
addr!("C011a73ee8576Fb46F5E1c5751cA3B9Fe0af2a6F"),
Expand Down Expand Up @@ -604,9 +591,6 @@ mod tests {
pool_id_deny_list: Default::default(),
};

// index all the pools.
pool_fetcher.run_maintenance().await.unwrap();

// see what the subgraph says.
let client = BalancerSubgraphClient::for_chain(chain_id, Client::new()).unwrap();
let subgraph_pools = client.get_registered_pools().await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

use {
super::internal::InternalPoolFetching,
crate::{
maintenance::Maintaining,
recent_block_cache::Block,
sources::balancer_v2::pools::Pool,
},
crate::{recent_block_cache::Block, sources::balancer_v2::pools::Pool},
anyhow::Result,
ethcontract::H256,
futures::future,
Expand Down Expand Up @@ -53,21 +49,3 @@ impl InternalPoolFetching for Aggregate {
.collect())
}
}

#[async_trait::async_trait]
impl Maintaining for Aggregate {
async fn run_maintenance(&self) -> Result<()> {
future::try_join_all(
self.fetchers
.iter()
.map(|fetcher| fetcher.run_maintenance()),
)
.await?;

Ok(())
}

fn name(&self) -> &str {
"BalancerPoolFetcher"
}
}
Loading
Loading