Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric for query cache hits #2006

Merged
merged 7 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion e2e/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def passive_packets(

assert (len(unreceived) == 0), (unreceived,
"unreceived acks mismatch (expected 0)")

# 9.Stop the relayer
proc.kill()

Expand Down Expand Up @@ -191,7 +192,7 @@ def raw(c: Config, ibc0: ChainId, ibc1: ChainId, port_id: PortId) -> Tuple[Clien

# The ChannelCloseInit message is currently denied by Gaia,
# and requires a patch to be accepted.
# channel.close(c, ibc0 , ibc1, ibc0_conn_id,
# channel.close(c, ibc0, ibc1, ibc0_conn_id,
# ibc1_conn_id, ibc0_chan_id, ibc1_chan_id)

return ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id
Expand Down
34 changes: 22 additions & 12 deletions relayer/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ const CHANNEL_CACHE_CAPACITY: u64 = 10_000;
const CONNECTION_CACHE_CAPACITY: u64 = 10_000;
const CLIENT_STATE_CACHE_CAPACITY: u64 = 10_000;

/// Whether or not a result was in cache (ie. a cache hit)
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum CacheStatus {
Hit,
Miss,
}

/// Alias for a result and its cache status.
pub type CacheResult<A, E> = Result<(A, CacheStatus), E>;

/// The main cache data structure, which comprises multiple sub-caches for caching
/// different chain components, each with different time-to-live values.
///
Expand Down Expand Up @@ -84,20 +94,20 @@ impl Cache {
&self,
id: &PortChannelId,
f: F,
) -> Result<ChannelEnd, E>
) -> CacheResult<ChannelEnd, E>
where
F: FnOnce() -> Result<ChannelEnd, E>,
{
if let Some(chan) = self.channels.get(id) {
// If cache hit, return it.
Ok(chan)
Ok((chan, CacheStatus::Hit))
} else {
// Only cache a channel end if the channel is open.
let chan = f()?;
if chan.state().is_open() {
self.channels.insert(id.clone(), chan.clone());
}
Ok(chan)
Ok((chan, CacheStatus::Miss))
adizere marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -109,18 +119,18 @@ impl Cache {
&self,
id: &ConnectionId,
f: F,
) -> Result<ConnectionEnd, E>
) -> CacheResult<ConnectionEnd, E>
where
F: FnOnce() -> Result<ConnectionEnd, E>,
{
if let Some(conn) = self.connections.get(id) {
Ok(conn)
Ok((conn, CacheStatus::Hit))
} else {
let conn = f()?;
if conn.state().is_open() {
self.connections.insert(id.clone(), conn.clone());
}
Ok(conn)
Ok((conn, CacheStatus::Miss))
}
}

Expand All @@ -132,16 +142,16 @@ impl Cache {
&self,
id: &ClientId,
f: F,
) -> Result<AnyClientState, E>
) -> CacheResult<AnyClientState, E>
where
F: FnOnce() -> Result<AnyClientState, E>,
{
if let Some(state) = self.client_states.get(id) {
Ok(state)
Ok((state, CacheStatus::Hit))
} else {
let state = f()?;
self.client_states.insert(id.clone(), state.clone());
Ok(state)
Ok((state, CacheStatus::Miss))
}
}

Expand All @@ -152,16 +162,16 @@ impl Cache {
///
/// This value is cached with a small time-to-live so that the latest height
/// query returns the same height if the same query is repeated within a small time frame.
pub fn get_or_try_update_latest_height_with<F, E>(&self, f: F) -> Result<Height, E>
pub fn get_or_try_update_latest_height_with<F, E>(&self, f: F) -> CacheResult<Height, E>
where
F: FnOnce() -> Result<Height, E>,
{
if let Some(height) = self.latest_height.get(&()) {
Ok(height)
Ok((height, CacheStatus::Hit))
} else {
let height = f()?;
self.latest_height.insert((), height);
Ok(height)
Ok((height, CacheStatus::Miss))
}
}
}
Expand Down
46 changes: 37 additions & 9 deletions relayer/src/chain/handle/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ use ibc_proto::ibc::core::connection::v1::QueryClientConnectionsRequest;
use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest;
use serde::{Serialize, Serializer};

use crate::cache::Cache;
use crate::cache::{Cache, CacheStatus};
use crate::chain::handle::{ChainHandle, ChainRequest, Subscription};
use crate::chain::tx::TrackedMsgs;
use crate::chain::{HealthCheck, StatusResponse};
use crate::config::ChainConfig;
use crate::error::Error;
use crate::telemetry;
use crate::{connection::ConnectionMsgType, keyring::KeyEntry};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -131,8 +132,15 @@ impl<Handle: ChainHandle> ChainHandle for CachingChainHandle<Handle> {

fn query_latest_height(&self) -> Result<Height, Error> {
let handle = self.inner();
self.cache
.get_or_try_update_latest_height_with(|| handle.query_latest_height())
let (result, in_cache) = self
.cache
.get_or_try_update_latest_height_with(|| handle.query_latest_height())?;

if in_cache == CacheStatus::Hit {
telemetry!(query_cache_hit, &self.id(), "query_latest_height");
adizere marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(result)
}

fn query_clients(
Expand All @@ -150,10 +158,17 @@ impl<Handle: ChainHandle> ChainHandle for CachingChainHandle<Handle> {
) -> Result<AnyClientState, Error> {
let handle = self.inner();
if height.is_zero() {
self.cache
let (result, in_cache) = self
.cache
.get_or_try_insert_client_state_with(client_id, || {
handle.query_client_state(client_id, height)
})
})?;

if in_cache == CacheStatus::Hit {
telemetry!(query_cache_hit, &self.id(), "query_client_state");
}

Ok(result)
} else {
handle.query_client_state(client_id, height)
}
Expand Down Expand Up @@ -212,10 +227,17 @@ impl<Handle: ChainHandle> ChainHandle for CachingChainHandle<Handle> {
) -> Result<ConnectionEnd, Error> {
let handle = self.inner();
if height.is_zero() {
self.cache
let (result, in_cache) = self
.cache
.get_or_try_insert_connection_with(connection_id, || {
handle.query_connection(connection_id, height)
})
})?;

if in_cache == CacheStatus::Hit {
telemetry!(query_cache_hit, &self.id(), "query_connection");
}

Ok(result)
} else {
handle.query_connection(connection_id, height)
}
Expand Down Expand Up @@ -257,10 +279,16 @@ impl<Handle: ChainHandle> ChainHandle for CachingChainHandle<Handle> {
) -> Result<ChannelEnd, Error> {
let handle = self.inner();
if height.is_zero() {
self.cache.get_or_try_insert_channel_with(
let (result, in_cache) = self.cache.get_or_try_insert_channel_with(
&PortChannelId::new(channel_id.clone(), port_id.clone()),
|| handle.query_channel(port_id, channel_id, height),
)
)?;

if in_cache == CacheStatus::Hit {
telemetry!(query_cache_hit, &self.id(), "query_connection");
}

Ok(result)
} else {
handle.query_channel(port_id, channel_id, height)
}
Expand Down
17 changes: 17 additions & 0 deletions telemetry/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ pub struct TelemetryState {

/// Number of queries emitted by the relayer, per chain and query type
queries: Counter<u64>,

/// Number of cache hits for queries emitted by the relayer, per chain and query type
query_cache_hits: Counter<u64>,
}

impl TelemetryState {
Expand Down Expand Up @@ -144,6 +147,15 @@ impl TelemetryState {

self.queries.add(1, labels);
}

pub fn query_cache_hit(&self, chain_id: &ChainId, query_type: &'static str) {
let labels = &[
KeyValue::new("chain", chain_id.to_string()),
KeyValue::new("query_type", query_type),
];

self.query_cache_hits.add(1, labels);
}
}

impl Default for TelemetryState {
Expand Down Expand Up @@ -190,6 +202,11 @@ impl Default for TelemetryState {
"Number of queries emitted by the relayer, per chain and query type",
)
.init(),

query_cache_hits: meter
.u64_counter("cache_hits")
.with_description("Number of cache hits for queries emitted by the relayer, per chain and query type")
.init(),
}
}
}