Skip to content

Commit

Permalink
Add metric for query cache hits
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Mar 23, 2022
1 parent 06cff57 commit d774001
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 23 deletions.
10 changes: 8 additions & 2 deletions e2e/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import argparse
import requests
import logging as l
from typing import Tuple
from pathlib import Path
Expand Down Expand Up @@ -151,6 +152,11 @@ def passive_packets(

assert (len(unreceived) == 0), (unreceived,
"unreceived acks mismatch (expected 0)")

# Show metrics after workflow
res = requests.get('http://localhost:3001/metrics')
print(res.text)

# 9.Stop the relayer
proc.kill()

Expand Down Expand Up @@ -191,8 +197,8 @@ def raw(c: Config, ibc0: ChainId, ibc1: ChainId, port_id: PortId) -> Tuple[Clien

# The ChannelCloseInit message is currently denied by Gaia,
# and requires a patch to be accepted.
# channel.close(c, ibc0 , ibc1, ibc0_conn_id,
# ibc1_conn_id, ibc0_chan_id, ibc1_chan_id)
channel.close(c, ibc0, ibc1, ibc0_conn_id,
ibc1_conn_id, ibc0_chan_id, ibc1_chan_id)

return ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id

Expand Down
34 changes: 22 additions & 12 deletions relayer/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ const CHANNEL_CACHE_CAPACITY: u64 = 10_000;
const CONNECTION_CACHE_CAPACITY: u64 = 10_000;
const CLIENT_STATE_CACHE_CAPACITY: u64 = 10_000;

/// Whether or not a result was in cache (ie. a cache hit)
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum CacheStatus {
Hit,
Miss,
}

/// Alias for a result and its cache status.
pub type CacheResult<A, E> = Result<(A, CacheStatus), E>;

/// The main cache data structure, which comprises multiple sub-caches for caching
/// different chain components, each with different time-to-live values.
///
Expand Down Expand Up @@ -84,20 +94,20 @@ impl Cache {
&self,
id: &PortChannelId,
f: F,
) -> Result<ChannelEnd, E>
) -> CacheResult<ChannelEnd, E>
where
F: FnOnce() -> Result<ChannelEnd, E>,
{
if let Some(chan) = self.channels.get(id) {
// If cache hit, return it.
Ok(chan)
Ok((chan, CacheStatus::Hit))
} else {
// Only cache a channel end if the channel is open.
let chan = f()?;
if chan.state().is_open() {
self.channels.insert(id.clone(), chan.clone());
}
Ok(chan)
Ok((chan, CacheStatus::Miss))
}
}

Expand All @@ -109,18 +119,18 @@ impl Cache {
&self,
id: &ConnectionId,
f: F,
) -> Result<ConnectionEnd, E>
) -> CacheResult<ConnectionEnd, E>
where
F: FnOnce() -> Result<ConnectionEnd, E>,
{
if let Some(conn) = self.connections.get(id) {
Ok(conn)
Ok((conn, CacheStatus::Hit))
} else {
let conn = f()?;
if conn.state().is_open() {
self.connections.insert(id.clone(), conn.clone());
}
Ok(conn)
Ok((conn, CacheStatus::Miss))
}
}

Expand All @@ -132,16 +142,16 @@ impl Cache {
&self,
id: &ClientId,
f: F,
) -> Result<AnyClientState, E>
) -> CacheResult<AnyClientState, E>
where
F: FnOnce() -> Result<AnyClientState, E>,
{
if let Some(state) = self.client_states.get(id) {
Ok(state)
Ok((state, CacheStatus::Hit))
} else {
let state = f()?;
self.client_states.insert(id.clone(), state.clone());
Ok(state)
Ok((state, CacheStatus::Miss))
}
}

Expand All @@ -152,16 +162,16 @@ impl Cache {
///
/// This value is cached with a small time-to-live so that the latest height
/// query returns the same height if the same query is repeated within a small time frame.
pub fn get_or_try_update_latest_height_with<F, E>(&self, f: F) -> Result<Height, E>
pub fn get_or_try_update_latest_height_with<F, E>(&self, f: F) -> CacheResult<Height, E>
where
F: FnOnce() -> Result<Height, E>,
{
if let Some(height) = self.latest_height.get(&()) {
Ok(height)
Ok((height, CacheStatus::Hit))
} else {
let height = f()?;
self.latest_height.insert((), height);
Ok(height)
Ok((height, CacheStatus::Miss))
}
}
}
Expand Down
46 changes: 37 additions & 9 deletions relayer/src/chain/handle/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ use ibc_proto::ibc::core::connection::v1::QueryClientConnectionsRequest;
use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest;
use serde::{Serialize, Serializer};

use crate::cache::Cache;
use crate::cache::{Cache, CacheStatus};
use crate::chain::handle::{ChainHandle, ChainRequest, Subscription};
use crate::chain::tx::TrackedMsgs;
use crate::chain::{HealthCheck, StatusResponse};
use crate::config::ChainConfig;
use crate::error::Error;
use crate::telemetry;
use crate::{connection::ConnectionMsgType, keyring::KeyEntry};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -131,8 +132,15 @@ impl<Handle: ChainHandle> ChainHandle for CachingChainHandle<Handle> {

fn query_latest_height(&self) -> Result<Height, Error> {
let handle = self.inner();
self.cache
.get_or_try_update_latest_height_with(|| handle.query_latest_height())
let (result, in_cache) = self
.cache
.get_or_try_update_latest_height_with(|| handle.query_latest_height())?;

if in_cache == CacheStatus::Hit {
telemetry!(query_cache_hit, &self.id(), "query_latest_height");
}

Ok(result)
}

fn query_clients(
Expand All @@ -150,10 +158,17 @@ impl<Handle: ChainHandle> ChainHandle for CachingChainHandle<Handle> {
) -> Result<AnyClientState, Error> {
let handle = self.inner();
if height.is_zero() {
self.cache
let (result, in_cache) = self
.cache
.get_or_try_insert_client_state_with(client_id, || {
handle.query_client_state(client_id, height)
})
})?;

if in_cache == CacheStatus::Hit {
telemetry!(query_cache_hit, &self.id(), "query_client_state");
}

Ok(result)
} else {
handle.query_client_state(client_id, height)
}
Expand Down Expand Up @@ -212,10 +227,17 @@ impl<Handle: ChainHandle> ChainHandle for CachingChainHandle<Handle> {
) -> Result<ConnectionEnd, Error> {
let handle = self.inner();
if height.is_zero() {
self.cache
let (result, in_cache) = self
.cache
.get_or_try_insert_connection_with(connection_id, || {
handle.query_connection(connection_id, height)
})
})?;

if in_cache == CacheStatus::Hit {
telemetry!(query_cache_hit, &self.id(), "query_connection");
}

Ok(result)
} else {
handle.query_connection(connection_id, height)
}
Expand Down Expand Up @@ -257,10 +279,16 @@ impl<Handle: ChainHandle> ChainHandle for CachingChainHandle<Handle> {
) -> Result<ChannelEnd, Error> {
let handle = self.inner();
if height.is_zero() {
self.cache.get_or_try_insert_channel_with(
let (result, in_cache) = self.cache.get_or_try_insert_channel_with(
&PortChannelId::new(channel_id.clone(), port_id.clone()),
|| handle.query_channel(port_id, channel_id, height),
)
)?;

if in_cache == CacheStatus::Hit {
telemetry!(query_cache_hit, &self.id(), "query_connection");
}

Ok(result)
} else {
handle.query_channel(port_id, channel_id, height)
}
Expand Down
17 changes: 17 additions & 0 deletions telemetry/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ pub struct TelemetryState {

/// Number of queries emitted by the relayer, per chain and query type
queries: Counter<u64>,

/// Number of cache hits for queries emitted by the relayer, per chain and query type
query_cache_hits: Counter<u64>,
}

impl TelemetryState {
Expand Down Expand Up @@ -144,6 +147,15 @@ impl TelemetryState {

self.queries.add(1, labels);
}

pub fn query_cache_hit(&self, chain_id: &ChainId, query_type: &'static str) {
let labels = &[
KeyValue::new("chain", chain_id.to_string()),
KeyValue::new("query_type", query_type),
];

self.query_cache_hits.add(1, labels);
}
}

impl Default for TelemetryState {
Expand Down Expand Up @@ -190,6 +202,11 @@ impl Default for TelemetryState {
"Number of queries emitted by the relayer, per chain and query type",
)
.init(),

query_cache_hits: meter
.u64_counter("cache_hits")
.with_description("Number of cache hits for queries emitted by the relayer, per chain and query type")
.init(),
}
}
}

0 comments on commit d774001

Please sign in to comment.