From ed4263790caa8cdf1b515f6250e70c1980406125 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 30 Mar 2022 11:32:03 +0200 Subject: [PATCH] Add metric for query cache hits (#2006) * Add metric for query cache hits * Use built-in library for showing metrics * Cleanup * Typo fix * unclog * Update changelog entry Co-authored-by: Adi Seredinschi --- .../features/relayer/2036-caching-metrics.md | 2 + e2e/run.py | 3 +- relayer/src/cache.rs | 34 +++++++++----- relayer/src/chain/handle/cache.rs | 46 +++++++++++++++---- telemetry/src/state.rs | 17 +++++++ 5 files changed, 80 insertions(+), 22 deletions(-) create mode 100644 .changelog/unreleased/features/relayer/2036-caching-metrics.md diff --git a/.changelog/unreleased/features/relayer/2036-caching-metrics.md b/.changelog/unreleased/features/relayer/2036-caching-metrics.md new file mode 100644 index 0000000000..4ba6cb3841 --- /dev/null +++ b/.changelog/unreleased/features/relayer/2036-caching-metrics.md @@ -0,0 +1,2 @@ +- Add a metric for query cache hits + ([#2036](https://github.com/informalsystems/ibc-rs/issues/2036)) \ No newline at end of file diff --git a/e2e/run.py b/e2e/run.py index 12aceb43df..721be3f386 100755 --- a/e2e/run.py +++ b/e2e/run.py @@ -151,6 +151,7 @@ def passive_packets( assert (len(unreceived) == 0), (unreceived, "unreceived acks mismatch (expected 0)") + # 9.Stop the relayer proc.kill() @@ -191,7 +192,7 @@ def raw(c: Config, ibc0: ChainId, ibc1: ChainId, port_id: PortId) -> Tuple[Clien # The ChannelCloseInit message is currently denied by Gaia, # and requires a patch to be accepted. - # channel.close(c, ibc0 , ibc1, ibc0_conn_id, + # channel.close(c, ibc0, ibc1, ibc0_conn_id, # ibc1_conn_id, ibc0_chan_id, ibc1_chan_id) return ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id diff --git a/relayer/src/cache.rs b/relayer/src/cache.rs index 2c4f69cf57..29fca4898a 100644 --- a/relayer/src/cache.rs +++ b/relayer/src/cache.rs @@ -23,6 +23,16 @@ const CHANNEL_CACHE_CAPACITY: u64 = 10_000; const CONNECTION_CACHE_CAPACITY: u64 = 10_000; const CLIENT_STATE_CACHE_CAPACITY: u64 = 10_000; +/// Whether or not a result was in cache (ie. a cache hit) +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum CacheStatus { + Hit, + Miss, +} + +/// Alias for a result and its cache status. +pub type CacheResult = Result<(A, CacheStatus), E>; + /// The main cache data structure, which comprises multiple sub-caches for caching /// different chain components, each with different time-to-live values. /// @@ -84,20 +94,20 @@ impl Cache { &self, id: &PortChannelId, f: F, - ) -> Result + ) -> CacheResult where F: FnOnce() -> Result, { if let Some(chan) = self.channels.get(id) { // If cache hit, return it. - Ok(chan) + Ok((chan, CacheStatus::Hit)) } else { // Only cache a channel end if the channel is open. let chan = f()?; if chan.state().is_open() { self.channels.insert(id.clone(), chan.clone()); } - Ok(chan) + Ok((chan, CacheStatus::Miss)) } } @@ -109,18 +119,18 @@ impl Cache { &self, id: &ConnectionId, f: F, - ) -> Result + ) -> CacheResult where F: FnOnce() -> Result, { if let Some(conn) = self.connections.get(id) { - Ok(conn) + Ok((conn, CacheStatus::Hit)) } else { let conn = f()?; if conn.state().is_open() { self.connections.insert(id.clone(), conn.clone()); } - Ok(conn) + Ok((conn, CacheStatus::Miss)) } } @@ -132,16 +142,16 @@ impl Cache { &self, id: &ClientId, f: F, - ) -> Result + ) -> CacheResult where F: FnOnce() -> Result, { if let Some(state) = self.client_states.get(id) { - Ok(state) + Ok((state, CacheStatus::Hit)) } else { let state = f()?; self.client_states.insert(id.clone(), state.clone()); - Ok(state) + Ok((state, CacheStatus::Miss)) } } @@ -152,16 +162,16 @@ impl Cache { /// /// This value is cached with a small time-to-live so that the latest height /// query returns the same height if the same query is repeated within a small time frame. - pub fn get_or_try_update_latest_height_with(&self, f: F) -> Result + pub fn get_or_try_update_latest_height_with(&self, f: F) -> CacheResult where F: FnOnce() -> Result, { if let Some(height) = self.latest_height.get(&()) { - Ok(height) + Ok((height, CacheStatus::Hit)) } else { let height = f()?; self.latest_height.insert((), height); - Ok(height) + Ok((height, CacheStatus::Miss)) } } } diff --git a/relayer/src/chain/handle/cache.rs b/relayer/src/chain/handle/cache.rs index dac48d1ea0..b9e49fae8c 100644 --- a/relayer/src/chain/handle/cache.rs +++ b/relayer/src/chain/handle/cache.rs @@ -34,12 +34,13 @@ use ibc_proto::ibc::core::connection::v1::QueryClientConnectionsRequest; use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest; use serde::{Serialize, Serializer}; -use crate::cache::Cache; +use crate::cache::{Cache, CacheStatus}; use crate::chain::handle::{ChainHandle, ChainRequest, Subscription}; use crate::chain::tx::TrackedMsgs; use crate::chain::{HealthCheck, StatusResponse}; use crate::config::ChainConfig; use crate::error::Error; +use crate::telemetry; use crate::{connection::ConnectionMsgType, keyring::KeyEntry}; #[derive(Debug, Clone)] @@ -131,8 +132,15 @@ impl ChainHandle for CachingChainHandle { fn query_latest_height(&self) -> Result { let handle = self.inner(); - self.cache - .get_or_try_update_latest_height_with(|| handle.query_latest_height()) + let (result, in_cache) = self + .cache + .get_or_try_update_latest_height_with(|| handle.query_latest_height())?; + + if in_cache == CacheStatus::Hit { + telemetry!(query_cache_hit, &self.id(), "query_latest_height"); + } + + Ok(result) } fn query_clients( @@ -150,10 +158,17 @@ impl ChainHandle for CachingChainHandle { ) -> Result { let handle = self.inner(); if height.is_zero() { - self.cache + let (result, in_cache) = self + .cache .get_or_try_insert_client_state_with(client_id, || { handle.query_client_state(client_id, height) - }) + })?; + + if in_cache == CacheStatus::Hit { + telemetry!(query_cache_hit, &self.id(), "query_client_state"); + } + + Ok(result) } else { handle.query_client_state(client_id, height) } @@ -212,10 +227,17 @@ impl ChainHandle for CachingChainHandle { ) -> Result { let handle = self.inner(); if height.is_zero() { - self.cache + let (result, in_cache) = self + .cache .get_or_try_insert_connection_with(connection_id, || { handle.query_connection(connection_id, height) - }) + })?; + + if in_cache == CacheStatus::Hit { + telemetry!(query_cache_hit, &self.id(), "query_connection"); + } + + Ok(result) } else { handle.query_connection(connection_id, height) } @@ -257,10 +279,16 @@ impl ChainHandle for CachingChainHandle { ) -> Result { let handle = self.inner(); if height.is_zero() { - self.cache.get_or_try_insert_channel_with( + let (result, in_cache) = self.cache.get_or_try_insert_channel_with( &PortChannelId::new(channel_id.clone(), port_id.clone()), || handle.query_channel(port_id, channel_id, height), - ) + )?; + + if in_cache == CacheStatus::Hit { + telemetry!(query_cache_hit, &self.id(), "query_channel"); + } + + Ok(result) } else { handle.query_channel(port_id, channel_id, height) } diff --git a/telemetry/src/state.rs b/telemetry/src/state.rs index 25ddbfc240..6b36fac41b 100644 --- a/telemetry/src/state.rs +++ b/telemetry/src/state.rs @@ -53,6 +53,9 @@ pub struct TelemetryState { /// Number of queries emitted by the relayer, per chain and query type queries: Counter, + + /// Number of cache hits for queries emitted by the relayer, per chain and query type + query_cache_hits: Counter, } impl TelemetryState { @@ -144,6 +147,15 @@ impl TelemetryState { self.queries.add(1, labels); } + + pub fn query_cache_hit(&self, chain_id: &ChainId, query_type: &'static str) { + let labels = &[ + KeyValue::new("chain", chain_id.to_string()), + KeyValue::new("query_type", query_type), + ]; + + self.query_cache_hits.add(1, labels); + } } impl Default for TelemetryState { @@ -190,6 +202,11 @@ impl Default for TelemetryState { "Number of queries emitted by the relayer, per chain and query type", ) .init(), + + query_cache_hits: meter + .u64_counter("cache_hits") + .with_description("Number of cache hits for queries emitted by the relayer, per chain and query type") + .init(), } } }