Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cursor metrics #4892

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
65 changes: 65 additions & 0 deletions rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use prometheus::IntGaugeVec;

use crate::CoreMetrics;

/// Struct encapsulating prometheus metrics used by SequenceAware and RateLimited cursors.
#[derive(Debug, Clone)]
pub struct CursorMetrics {
/// Current block of the cursor.
/// Used by both sequence aware and rate limited cursors.
/// Labels:
/// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`.
/// - `chain`: Chain the cursor is collecting data from.
/// - `cursor_type`: The type of cursor. E.g. `forward_sequenced`, `backward_sequenced`, `forward_rate_limited` or `backward_rate_limited`.
Mantas-M marked this conversation as resolved.
Show resolved Hide resolved
pub cursor_current_block: IntGaugeVec,
Mantas-M marked this conversation as resolved.
Show resolved Hide resolved

/// Current sequence of the cursor.
/// Only used by sequence aware cursors.
/// Labels:
/// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`.
/// - `chain`: Chain the cursor is collecting data from.
/// - `cursor_type`: The type of cursor. E.g. `forward_sequenced`, `backward_sequenced`, `forward_rate_limited` or `backward_rate_limited`.
Mantas-M marked this conversation as resolved.
Show resolved Hide resolved
Mantas-M marked this conversation as resolved.
Show resolved Hide resolved
pub cursor_current_sequence: IntGaugeVec,

/// Max sequence of the cursor.
/// Only used by sequence aware cursors.
/// Labels:
/// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`.
/// - `chain`: Chain the cursor is collecting data from.
pub cursor_max_sequence: IntGaugeVec,
}

impl CursorMetrics {
/// Instantiate a new CursorMetrics object.
pub fn new(metrics: &CoreMetrics) -> Self {
let cursor_current_block = metrics
.new_int_gauge(
"cursor_current_block",
"Current block of the cursor",
&["event_type", "chain", "cursor_type"],
)
.expect("failed to register cursor_current_block metric");

let cursor_current_sequence = metrics
.new_int_gauge(
"cursor_current_sequence",
"Current sequence of the cursor",
&["event_type", "chain", "cursor_type"],
)
.expect("failed to register cursor_current_sequence metric");

let cursor_max_sequence = metrics
.new_int_gauge(
"cursor_max_sequence",
"Max sequence of the cursor",
&["event_type", "chain"],
)
.expect("failed to register cursor_max_sequence metric");

CursorMetrics {
cursor_current_block,
cursor_current_sequence,
cursor_max_sequence,
}
}
}
25 changes: 23 additions & 2 deletions rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
pub(crate) mod sequence_aware;

use hyperlane_core::{
Delivery, HyperlaneDomainProtocol, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion,
};

pub(crate) mod sequence_aware;
pub(crate) use sequence_aware::ForwardBackwardSequenceAwareSyncCursor;

pub(crate) mod rate_limited;
pub(crate) use rate_limited::RateLimitedContractSyncCursor;

pub(crate) mod metrics;
pub(crate) use metrics::CursorMetrics;

pub enum CursorType {
SequenceAware,
RateLimited,
Expand All @@ -24,6 +27,8 @@ pub trait Indexable {
fn broadcast_channel_size() -> Option<usize> {
None
}
/// Returns the name of the type for metrics.
fn name() -> &'static str;
}

impl Indexable for HyperlaneMessage {
Expand All @@ -40,6 +45,10 @@ impl Indexable for HyperlaneMessage {
fn broadcast_channel_size() -> Option<usize> {
TX_ID_CHANNEL_CAPACITY
}

fn name() -> &'static str {
"hyperlane_message"
}
}

impl Indexable for InterchainGasPayment {
Expand All @@ -51,6 +60,10 @@ impl Indexable for InterchainGasPayment {
HyperlaneDomainProtocol::Cosmos => CursorType::RateLimited,
}
}

fn name() -> &'static str {
"interchain_gas_payment"
}
}

impl Indexable for MerkleTreeInsertion {
Expand All @@ -62,6 +75,10 @@ impl Indexable for MerkleTreeInsertion {
HyperlaneDomainProtocol::Cosmos => CursorType::SequenceAware,
}
}

fn name() -> &'static str {
"merkle_tree_insertion"
}
}

impl Indexable for Delivery {
Expand All @@ -73,4 +90,8 @@ impl Indexable for Delivery {
HyperlaneDomainProtocol::Cosmos => CursorType::RateLimited,
}
}

fn name() -> &'static str {
"delivery"
}
}
109 changes: 89 additions & 20 deletions rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ use std::{
use async_trait::async_trait;
use derive_new::new;
use eyre::Result;

use hyperlane_core::{
ContractSyncCursor, CursorAction, HyperlaneWatermarkedLogStore, Indexed, Indexer, LogMeta,
ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneWatermarkedLogStore, Indexed,
Indexer, LogMeta,
};

use crate::contract_sync::eta_calculator::SyncerEtaCalculator;

use super::{CursorMetrics, Indexable};

/// Time window for the moving average used in the eta calculator in seconds.
const ETA_TIME_WINDOW: f64 = 2. * 60.;

Expand Down Expand Up @@ -83,12 +87,16 @@ pub(crate) struct RateLimitedContractSyncCursor<T> {
last_tip_update: Instant,
eta_calculator: SyncerEtaCalculator,
sync_state: SyncState,
metrics: Arc<CursorMetrics>,
domain: HyperlaneDomain,
}

impl<T> RateLimitedContractSyncCursor<T> {
impl<T: Indexable + Sync + Send + Debug + 'static> RateLimitedContractSyncCursor<T> {
/// Construct a new contract sync helper.
pub async fn new(
indexer: Arc<dyn Indexer<T>>,
metrics: Arc<CursorMetrics>,
domain: &HyperlaneDomain,
store: Arc<dyn HyperlaneWatermarkedLogStore<T>>,
chunk_size: u32,
initial_height: u32,
Expand All @@ -107,6 +115,8 @@ impl<T> RateLimitedContractSyncCursor<T> {
// The rate limited cursor currently only syncs in the forward direction.
SyncDirection::Forward,
),
metrics,
domain: domain.to_owned(),
})
}

Expand Down Expand Up @@ -155,12 +165,24 @@ impl<T> RateLimitedContractSyncCursor<T> {
Duration::from_secs(0)
}
}

async fn update_metrics(&self) {
let latest_block = self.latest_queried_block();
let chain_name = self.domain.name();
// The rate limited cursor currently only syncs in the forward direction.
let label_values = &[T::name(), chain_name, "forward_rate_limited"];

self.metrics
.cursor_current_block
.with_label_values(label_values)
.set(latest_block as i64);
}
}

#[async_trait]
impl<T> ContractSyncCursor<T> for RateLimitedContractSyncCursor<T>
where
T: Send + Sync + Debug + 'static,
T: Send + Sync + Debug + 'static + Indexable,
Mantas-M marked this conversation as resolved.
Show resolved Hide resolved
{
async fn next_action(&mut self) -> Result<(CursorAction, Duration)> {
let eta = self.sync_eta();
Expand All @@ -187,6 +209,7 @@ where
_: Vec<(Indexed<T>, LogMeta)>,
range: RangeInclusive<u32>,
) -> Result<()> {
self.update_metrics().await;
// Store a relatively conservative view of the high watermark, which should allow a single watermark to be
// safely shared across multiple cursors, so long as they are running sufficiently in sync
self.store
Expand Down Expand Up @@ -216,63 +239,106 @@ where
}
}

impl<T> Debug for RateLimitedContractSyncCursor<T> {
impl<T: Indexable> Debug for RateLimitedContractSyncCursor<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RateLimitedContractSyncCursor")
.field("tip", &self.tip)
.field("last_tip_update", &self.last_tip_update)
.field("sync_state", &self.sync_state)
.field("domain", &self.domain)
.finish()
}
}

#[cfg(test)]
pub(crate) mod test {
use super::*;
use hyperlane_core::{ChainResult, HyperlaneLogStore};
use crate::cursors::CursorType;
use hyperlane_core::{ChainResult, HyperlaneDomainProtocol, HyperlaneLogStore};
use mockall::{self, Sequence};

const CHUNK_SIZE: u32 = 10;
const INITIAL_HEIGHT: u32 = 0;

#[derive(Debug, Clone)]
struct MockIndexable;

unsafe impl Sync for MockIndexable {}
unsafe impl Send for MockIndexable {}

impl Indexable for MockIndexable {
fn indexing_cursor(_domain: HyperlaneDomainProtocol) -> CursorType {
CursorType::RateLimited
}

fn name() -> &'static str {
"mock_indexable"
}
}

mockall::mock! {
pub Indexer {}
pub Indexer<T: Indexable + Send + Sync> {}

impl Debug for Indexer {
impl<T: Indexable + Send + Sync> Debug for Indexer<T> {
fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result;
}

#[async_trait]
impl Indexer<()> for Indexer {
async fn fetch_logs_in_range(&self, range: RangeInclusive<u32>) -> ChainResult<Vec<(hyperlane_core::Indexed<()> , LogMeta)>>;
impl<T: Indexable + Send + Sync> Indexer<T> for Indexer<T> {
async fn fetch_logs_in_range(&self, range: RangeInclusive<u32>) -> ChainResult<Vec<(hyperlane_core::Indexed<T>, LogMeta)>>;
async fn get_finalized_block_number(&self) -> ChainResult<u32>;
}
}

mockall::mock! {
pub Db {}
pub Db<T: Indexable + Send + Sync> {}

impl Debug for Db {
impl<T: Indexable + Send + Sync> Debug for Db<T> {
fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result;
}

#[async_trait]
impl HyperlaneLogStore<()> for Db {
async fn store_logs(&self, logs: &[(hyperlane_core::Indexed<()> , LogMeta)]) -> Result<u32>;
impl<T: Indexable + Send + Sync> HyperlaneLogStore<T> for Db<T> {
async fn store_logs(&self, logs: &[(hyperlane_core::Indexed<T>, LogMeta)]) -> Result<u32>;
}

#[async_trait]
impl HyperlaneWatermarkedLogStore<()> for Db {
impl<T: Indexable + Send + Sync> HyperlaneWatermarkedLogStore<T> for Db<T> {
async fn retrieve_high_watermark(&self) -> Result<Option<u32>>;
async fn store_high_watermark(&self, block_number: u32) -> Result<()>;
}
}

async fn mock_rate_limited_cursor(
fn mock_cursor_metrics() -> CursorMetrics {
CursorMetrics {
cursor_current_block: prometheus::IntGaugeVec::new(
prometheus::Opts::new("cursor_current_block", "Current block of the cursor")
.namespace("mock")
.subsystem("cursor"),
&["event_type", "chain", "cursor_type"],
)
.unwrap(),
cursor_current_sequence: prometheus::IntGaugeVec::new(
prometheus::Opts::new("cursor_current_sequence", "Current sequence of the cursor")
.namespace("mock")
.subsystem("cursor"),
&["event_type", "chain", "cursor_type"],
)
.unwrap(),
cursor_max_sequence: prometheus::IntGaugeVec::new(
prometheus::Opts::new("cursor_max_sequence", "Max sequence of the cursor")
.namespace("mock")
.subsystem("cursor"),
&["event_type", "chain"],
)
.unwrap(),
}
}
async fn mock_rate_limited_cursor<T: Indexable + Debug + Send + Sync + 'static>(
custom_chain_tips: Option<Vec<u32>>,
) -> RateLimitedContractSyncCursor<()> {
) -> RateLimitedContractSyncCursor<T> {
let mut seq = Sequence::new();
let mut indexer = MockIndexer::new();
let mut indexer = MockIndexer::<T>::new();
match custom_chain_tips {
Some(chain_tips) => {
for tip in chain_tips {
Expand All @@ -294,11 +360,14 @@ pub(crate) mod test {
}

let mut db = MockDb::new();
let metrics = mock_cursor_metrics();
db.expect_store_high_watermark().returning(|_| Ok(()));
let chunk_size = CHUNK_SIZE;
let initial_height = INITIAL_HEIGHT;
RateLimitedContractSyncCursor::new(
Arc::new(indexer),
Arc::new(metrics),
&HyperlaneDomain::new_test_domain("test"),
Arc::new(db),
chunk_size,
initial_height,
Expand All @@ -309,7 +378,7 @@ pub(crate) mod test {

#[tokio::test]
async fn test_next_action_retries_if_update_isnt_called() {
let mut cursor = mock_rate_limited_cursor(None).await;
let mut cursor = mock_rate_limited_cursor::<MockIndexable>(None).await;
let (action_1, _) = cursor.next_action().await.unwrap();
let (_action_2, _) = cursor.next_action().await.unwrap();

Expand All @@ -319,7 +388,7 @@ pub(crate) mod test {

#[tokio::test]
async fn test_next_action_changes_if_update_is_called() {
let mut cursor = mock_rate_limited_cursor(None).await;
let mut cursor = mock_rate_limited_cursor::<MockIndexable>(None).await;
let (action_1, _) = cursor.next_action().await.unwrap();

let range = match action_1 {
Expand All @@ -336,7 +405,7 @@ pub(crate) mod test {
#[tokio::test]
async fn test_next_action_sleeps_if_tip_is_not_updated() {
let chain_tips = vec![10];
let mut cursor = mock_rate_limited_cursor(Some(chain_tips)).await;
let mut cursor = mock_rate_limited_cursor::<MockIndexable>(Some(chain_tips)).await;
let (action, _) = cursor.next_action().await.unwrap();
assert!(matches!(action, CursorAction::Sleep(_)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ impl<T: Debug> BackwardSequenceAwareSyncCursor<T> {
}
}

/// Get the last indexed sequence or 0 if no logs have been indexed yet.
pub fn last_sequence(&self) -> u32 {
self.last_indexed_snapshot.sequence.unwrap_or(0)
}

/// Gets the next range of logs to query.
/// If the cursor is fully synced, this returns None.
/// Otherwise, it returns the next range to query, either by block or sequence depending on the mode.
Expand Down
Loading