Skip to content

Commit

Permalink
Refactor recent block cache metrics to macro
Browse files Browse the repository at this point in the history
As part of this there is now a label value that differentiates between
different caches rather than sharing the same metric.
  • Loading branch information
vkgnosis committed Aug 5, 2022
1 parent bc85274 commit e518327
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 160 deletions.
6 changes: 1 addition & 5 deletions crates/e2e/tests/e2e/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ use shared::{
recent_block_cache::CacheConfig,
signature_validator::Web3SignatureValidator,
sources::uniswap_v2::{
self,
pair_provider::PairProvider,
pool_cache::{NoopPoolCacheMetrics, PoolCache},
pool_fetching::PoolFetcher,
self, pair_provider::PairProvider, pool_cache::PoolCache, pool_fetching::PoolFetcher,
},
Web3,
};
Expand Down Expand Up @@ -200,7 +197,6 @@ impl OrderbookServices {
},
Arc::new(PoolFetcher::uniswap(pair_provider, web3.clone())),
current_block_stream.clone(),
Arc::new(NoopPoolCacheMetrics),
)
.unwrap();
let gas_estimator = Arc::new(web3.clone());
Expand Down
2 changes: 0 additions & 2 deletions crates/orderbook/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ async fn main() {
cache_config,
Arc::new(pool_aggregator),
current_block_stream.clone(),
metrics.clone(),
)
.expect("failed to create pool cache"),
);
Expand All @@ -279,7 +278,6 @@ async fn main() {
token_info_fetcher.clone(),
cache_config,
current_block_stream.clone(),
metrics.clone(),
client.clone(),
&contracts,
args.shared.balancer_pool_deny_list,
Expand Down
34 changes: 0 additions & 34 deletions crates/orderbook/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use anyhow::Result;
use gas_estimation::GasPrice1559;
use prometheus::{Gauge, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, Opts};
use shared::sources::{
balancer_v2::pool_fetching::BalancerPoolCacheMetrics, uniswap_v2::pool_cache::PoolCacheMetrics,
};
use std::time::Duration;

pub struct Metrics {
pool_cache_hits: IntCounter,
pool_cache_misses: IntCounter,
/// Gas estimate metrics
gas_price: Gauge,
price_estimates: IntCounterVec,
Expand All @@ -26,17 +21,6 @@ impl Metrics {
pub fn new() -> Result<Self> {
let registry = global_metrics::get_metrics_registry();

let pool_cache_hits = IntCounter::new(
"pool_cache_hits",
"Number of cache hits in the pool fetcher cache.",
)?;
registry.register(Box::new(pool_cache_hits.clone()))?;
let pool_cache_misses = IntCounter::new(
"pool_cache_misses",
"Number of cache misses in the pool fetcher cache.",
)?;
registry.register(Box::new(pool_cache_misses.clone()))?;

let opts = Opts::new("gas_price", "Gas price estimate over time.");
let gas_price = Gauge::with_opts(opts).unwrap();
registry.register(Box::new(gas_price.clone()))?;
Expand Down Expand Up @@ -91,8 +75,6 @@ impl Metrics {
registry.register(Box::new(auction_price_estimate_timeouts.clone()))?;

Ok(Self {
pool_cache_hits,
pool_cache_misses,
gas_price,
price_estimates,
native_price_cache,
Expand All @@ -106,13 +88,6 @@ impl Metrics {
}
}

impl PoolCacheMetrics for Metrics {
fn pools_fetched(&self, cache_hits: usize, cache_misses: usize) {
self.pool_cache_hits.inc_by(cache_hits as u64);
self.pool_cache_misses.inc_by(cache_misses as u64);
}
}

impl crate::solvable_orders::AuctionMetrics for Metrics {
fn auction_updated(
&self,
Expand Down Expand Up @@ -161,15 +136,6 @@ impl shared::price_estimation::instrumented::Metrics for Metrics {
}
}

impl BalancerPoolCacheMetrics for Metrics {
fn pools_fetched(&self, cache_hits: usize, cache_misses: usize) {
// We may want to distinguish cache metrics between the different
// liquidity sources in the future, for now just use the same counters.
self.pool_cache_hits.inc_by(cache_hits as u64);
self.pool_cache_misses.inc_by(cache_misses as u64);
}
}

impl shared::price_estimation::native_price_cache::Metrics for Metrics {
fn native_price_cache(&self, misses: usize, hits: usize) {
self.native_price_cache
Expand Down
3 changes: 0 additions & 3 deletions crates/shared/src/price_estimation/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ mod tests {
use crate::sources::balancer_v2::pool_fetching::BalancerContracts;
use crate::sources::balancer_v2::BalancerFactoryKind;
use crate::sources::uniswap_v2;
use crate::sources::uniswap_v2::pool_cache::NoopPoolCacheMetrics;
use crate::token_info::TokenInfoFetcher;
use crate::transport::http::HttpTransport;
use crate::Web3;
Expand Down Expand Up @@ -414,7 +413,6 @@ mod tests {
current_block_stream(web3.clone(), Duration::from_secs(1))
.await
.unwrap(),
Arc::new(NoopPoolCacheMetrics),
)
.unwrap(),
);
Expand All @@ -432,7 +430,6 @@ mod tests {
token_info.clone(),
Default::default(),
current_block_stream.clone(),
Arc::new(crate::sources::balancer_v2::pool_fetching::NoopBalancerPoolCacheMetrics),
client.clone(),
&contracts,
Default::default(),
Expand Down
48 changes: 29 additions & 19 deletions crates/shared/src/recent_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::current_block::{self, CurrentBlockStream};
use anyhow::Result;
use ethcontract::BlockNumber;
use lru::LruCache;
use prometheus::IntCounterVec;
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
hash::Hash,
Expand Down Expand Up @@ -69,7 +70,7 @@ impl From<Block> for BlockNumber {
///
/// Caches on-chain data for a specific number of blocks and automatically updates the N most
/// recently used entries automatically when a new block arrives.
pub struct RecentBlockCache<K, V, F, M>
pub struct RecentBlockCache<K, V, F>
where
K: CacheKey<V>,
F: CacheFetching<K, V>,
Expand All @@ -78,9 +79,10 @@ where
number_of_blocks_to_cache: NonZeroU64,
fetcher: F,
block_stream: CurrentBlockStream,
metrics: M,
maximum_retries: u32,
delay_between_retries: Duration,
metrics: &'static Metrics,
metrics_label: &'static str,
}

#[derive(Clone, Copy, Debug)]
Expand All @@ -104,21 +106,22 @@ impl Default for CacheConfig {
}
}

pub trait CacheMetrics: Send + Sync {
fn entries_fetched(&self, cache_hits: usize, cache_misses: usize);
}
#[derive(prometheus_metric_storage::MetricStorage)]
struct Metrics {
/// hits
#[metric(labels("cache_type"))]
recent_block_cache_hits: IntCounterVec,

pub struct NoopCacheMetrics;
impl CacheMetrics for NoopCacheMetrics {
fn entries_fetched(&self, _: usize, _: usize) {}
/// misses
#[metric(labels("cache_type"))]
recent_block_cache_misses: IntCounterVec,
}

impl<K, V, F, M> RecentBlockCache<K, V, F, M>
impl<K, V, F> RecentBlockCache<K, V, F>
where
K: CacheKey<V>,
V: Clone,
F: CacheFetching<K, V>,
M: CacheMetrics,
{
/// number_of_blocks_to_cache: Previous blocks stay cached until the block is this much older
/// than the current block. If there is a request for a block that is already too old then the
Expand All @@ -133,7 +136,7 @@ where
config: CacheConfig,
fetcher: F,
block_stream: CurrentBlockStream,
metrics: M,
metrics_label: &'static str,
) -> Result<Self> {
let block = current_block::block_number(&block_stream.borrow())?;
Ok(Self {
Expand All @@ -145,9 +148,10 @@ where
number_of_blocks_to_cache: config.number_of_blocks_to_cache,
fetcher,
block_stream,
metrics,
maximum_retries: config.max_retries,
delay_between_retries: config.delay_between_retries,
metrics: Metrics::instance(global_metrics::get_metric_storage_registry()).unwrap(),
metrics_label,
})
}

Expand Down Expand Up @@ -219,7 +223,13 @@ where
}

self.metrics
.entries_fetched(cache_hit_count, cache_misses.len());
.recent_block_cache_hits
.with_label_values(&[self.metrics_label])
.inc_by(cache_hit_count as u64);
self.metrics
.recent_block_cache_misses
.with_label_values(&[self.metrics_label])
.inc_by(cache_misses.len() as u64);

if cache_misses.is_empty() {
return Ok(cache_hits);
Expand Down Expand Up @@ -393,7 +403,7 @@ mod tests {
},
fetcher,
receiver,
NoopCacheMetrics,
"",
)
.unwrap();

Expand Down Expand Up @@ -447,7 +457,7 @@ mod tests {
},
fetcher,
receiver,
NoopCacheMetrics,
"",
)
.unwrap();

Expand Down Expand Up @@ -495,7 +505,7 @@ mod tests {
},
fetcher,
receiver,
NoopCacheMetrics,
"",
)
.unwrap();

Expand Down Expand Up @@ -553,7 +563,7 @@ mod tests {
},
fetcher,
receiver,
NoopCacheMetrics,
"",
)
.unwrap();

Expand Down Expand Up @@ -618,7 +628,7 @@ mod tests {
},
fetcher,
receiver,
NoopCacheMetrics,
"",
)
.unwrap();

Expand Down Expand Up @@ -660,7 +670,7 @@ mod tests {
},
fetcher,
receiver,
NoopCacheMetrics,
"",
)
.unwrap();
let key = TestKey(0);
Expand Down
4 changes: 0 additions & 4 deletions crates/shared/src/sources/balancer_v2/pool_fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ mod internal;
mod pool_storage;
mod registry;

pub use self::cache::{BalancerPoolCacheMetrics, NoopBalancerPoolCacheMetrics};
use self::{
aggregate::Aggregate, cache::Cache, internal::InternalPoolFetching, registry::Registry,
};
Expand Down Expand Up @@ -221,7 +220,6 @@ impl BalancerPoolFetcher {
token_infos: Arc<dyn TokenInfoFetching>,
config: CacheConfig,
block_stream: CurrentBlockStream,
metrics: Arc<dyn BalancerPoolCacheMetrics>,
client: Client,
contracts: &BalancerContracts,
deny_listed_pool_ids: Vec<H256>,
Expand All @@ -231,7 +229,6 @@ impl BalancerPoolFetcher {
create_aggregate_pool_fetcher(pool_initializer, token_infos, contracts).await?,
config,
block_stream,
metrics,
)?);

Ok(Self {
Expand Down Expand Up @@ -451,7 +448,6 @@ mod tests {
token_info_fetcher,
Default::default(),
block_stream,
Arc::new(NoopBalancerPoolCacheMetrics),
Default::default(),
&contracts,
deny_list,
Expand Down
26 changes: 3 additions & 23 deletions crates/shared/src/sources/balancer_v2/pool_fetching/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,15 @@ use super::internal::InternalPoolFetching;
use crate::{
current_block::CurrentBlockStream,
maintenance::Maintaining,
recent_block_cache::{
Block, CacheConfig, CacheFetching, CacheKey, CacheMetrics, RecentBlockCache,
},
recent_block_cache::{Block, CacheConfig, CacheFetching, CacheKey, RecentBlockCache},
sources::balancer_v2::pools::Pool,
};
use anyhow::Result;
use ethcontract::H256;
use std::{collections::HashSet, sync::Arc};

/// Trait used for Balancer pool cache metrics.
pub trait BalancerPoolCacheMetrics: Send + Sync {
fn pools_fetched(&self, cache_hits: usize, cache_misses: usize);
}

pub struct NoopBalancerPoolCacheMetrics;
impl BalancerPoolCacheMetrics for NoopBalancerPoolCacheMetrics {
fn pools_fetched(&self, _: usize, _: usize) {}
}

/// Internal type alias used for inner recent block cache.
type PoolCache<Inner> =
RecentBlockCache<H256, Pool, CacheFetcher<Inner>, Arc<dyn BalancerPoolCacheMetrics>>;
type PoolCache<Inner> = RecentBlockCache<H256, Pool, CacheFetcher<Inner>>;

/// A cached pool fetcher that wraps an inner `InternalPoolFetching`
/// implementation.
Expand All @@ -48,11 +35,10 @@ where
inner: Inner,
config: CacheConfig,
block_stream: CurrentBlockStream,
metrics: Arc<dyn BalancerPoolCacheMetrics>,
) -> Result<Self> {
let inner = Arc::new(inner);
let fetcher = CacheFetcher(inner.clone());
let cache = RecentBlockCache::new(config, fetcher, block_stream, metrics)?;
let cache = RecentBlockCache::new(config, fetcher, block_stream, "balancerv2")?;
Ok(Self { inner, cache })
}
}
Expand Down Expand Up @@ -111,9 +97,3 @@ where
self.0.pools_by_id(pool_ids, at_block).await
}
}

impl CacheMetrics for Arc<dyn BalancerPoolCacheMetrics> {
fn entries_fetched(&self, cache_hits: usize, cache_misses: usize) {
self.pools_fetched(cache_hits, cache_misses)
}
}
Loading

0 comments on commit e518327

Please sign in to comment.