From 98cdb4a3da718140817e5e79103e08fc72435fa8 Mon Sep 17 00:00:00 2001 From: "Mantas M." Date: Sun, 24 Nov 2024 13:05:52 +0100 Subject: [PATCH 1/7] feat(cursor-metrics): forward-backward cursor metrics --- .../src/contract_sync/cursors/mod.rs | 18 ++++++ .../cursors/sequence_aware/backward.rs | 5 ++ .../cursors/sequence_aware/forward.rs | 16 +++++ .../cursors/sequence_aware/metrics.rs | 61 +++++++++++++++++++ .../cursors/sequence_aware/mod.rs | 53 +++++++++++++++- .../src/contract_sync/metrics.rs | 11 +++- .../hyperlane-base/src/contract_sync/mod.rs | 2 + 7 files changed, 162 insertions(+), 4 deletions(-) create mode 100644 rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/metrics.rs diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs index 563d0fcc74..0846119195 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs @@ -24,6 +24,8 @@ pub trait Indexable { fn broadcast_channel_size() -> Option { None } + /// Returns the name of the type for metrics. + fn name() -> &'static str; } impl Indexable for HyperlaneMessage { @@ -40,6 +42,10 @@ impl Indexable for HyperlaneMessage { fn broadcast_channel_size() -> Option { TX_ID_CHANNEL_CAPACITY } + + fn name() -> &'static str { + "HyperlaneMessage" + } } impl Indexable for InterchainGasPayment { @@ -51,6 +57,10 @@ impl Indexable for InterchainGasPayment { HyperlaneDomainProtocol::Cosmos => CursorType::RateLimited, } } + + fn name() -> &'static str { + "InterchainGasPayment" + } } impl Indexable for MerkleTreeInsertion { @@ -62,6 +72,10 @@ impl Indexable for MerkleTreeInsertion { HyperlaneDomainProtocol::Cosmos => CursorType::SequenceAware, } } + + fn name() -> &'static str { + "MerkleTreeInsertion" + } } impl Indexable for Delivery { @@ -73,4 +87,8 @@ impl Indexable for Delivery { HyperlaneDomainProtocol::Cosmos => CursorType::RateLimited, } } + + fn name() -> &'static str { + "Delivery" + } } diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs index 179ae3dd3c..866ae61fc7 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs @@ -76,6 +76,11 @@ impl BackwardSequenceAwareSyncCursor { } } + /// Get the last indexed sequence or 0 if no logs have been indexed yet. + pub fn last_sequence(&self) -> u32 { + self.last_indexed_snapshot.sequence.unwrap_or(0) + } + /// Gets the next range of logs to query. /// If the cursor is fully synced, this returns None. /// Otherwise, it returns the next range to query, either by block or sequence depending on the mode. diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs index 967f4e6053..0aa8620f1c 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs @@ -88,6 +88,22 @@ impl ForwardSequenceAwareSyncCursor { } } + /// Get target sequence or return 0 if request failed + pub async fn target_sequence(&self) -> u32 { + let (count, _) = self + .latest_sequence_querier + .latest_sequence_count_and_tip() + .await + .ok() + .unwrap_or((None, 0)); + count.unwrap_or(0).saturating_sub(1) + } + + /// Get the last indexed sequence or 0 if no logs have been indexed yet. + pub fn last_sequence(&self) -> u32 { + self.last_indexed_snapshot.sequence.unwrap_or(0) + } + /// Gets the next range of logs to index. /// If there are no logs to index, returns `None`. /// If there are logs to index, returns the range of logs, either by sequence or block number diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/metrics.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/metrics.rs new file mode 100644 index 0000000000..d2fc4f1db1 --- /dev/null +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/metrics.rs @@ -0,0 +1,61 @@ +use crate::CoreMetrics; +use prometheus::IntGaugeVec; + +/// Struct encapsulating prometheus metrics used by the ForwardBackwardCursorMetrics. +#[derive(Debug, Clone)] +pub struct ForwardBackwardCursorMetrics { + /// Current block of the cursor. + /// Labels: + /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. + /// - `chain`: Chain the cursor is collecting data from. + /// - `cursor_type`: The type of cursor. E.g. `ForwardSequenced` or `BackwardSequenced`. + pub cursor_current_block: IntGaugeVec, + + /// Current sequence of the cursor. + /// Labels: + /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. + /// - `chain`: Chain the cursor is collecting data from. + /// - `cursor_type`: The type of cursor. E.g. `ForwardSequenced` or `BackwardSequenced`. + pub cursor_current_sequence: IntGaugeVec, + + /// Max sequence of the cursor. + /// Labels: + /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. + /// - `chain`: Chain the cursor is collecting data from. + pub cursor_max_sequence: IntGaugeVec, +} + +impl ForwardBackwardCursorMetrics { + /// Instantiate a new ForwardBackwardCursorMetrics object. + pub fn new(metrics: &CoreMetrics) -> Self { + let cursor_current_block = metrics + .new_int_gauge( + "cursor_current_block", + "Current block of the cursor", + &["event_type", "chain", "cursor_type"], + ) + .expect("failed to register cursor_current_block metric"); + + let cursor_current_sequence = metrics + .new_int_gauge( + "cursor_current_sequence", + "Current sequence of the cursor", + &["event_type", "chain", "cursor_type"], + ) + .expect("failed to register cursor_current_sequence metric"); + + let cursor_max_sequence = metrics + .new_int_gauge( + "cursor_max_sequence", + "Max sequence of the cursor", + &["event_type", "chain"], + ) + .expect("failed to register cursor_max_sequence metric"); + + ForwardBackwardCursorMetrics { + cursor_current_block, + cursor_current_sequence, + cursor_max_sequence, + } + } +} diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs index 74e7ebe014..c524cf525f 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs @@ -3,16 +3,20 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; use async_trait::async_trait; use eyre::Result; use hyperlane_core::{ - ChainCommunicationError, ContractSyncCursor, CursorAction, + ChainCommunicationError, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneSequenceAwareIndexerStoreReader, IndexMode, Indexed, LogMeta, SequenceAwareIndexer, }; use std::ops::RangeInclusive; mod backward; mod forward; +mod metrics; pub(crate) use backward::BackwardSequenceAwareSyncCursor; pub(crate) use forward::ForwardSequenceAwareSyncCursor; +pub(crate) use metrics::ForwardBackwardCursorMetrics; + +use super::Indexable; #[derive(Debug, Clone, PartialEq, Eq)] struct LastIndexedSnapshot { @@ -67,11 +71,17 @@ pub(crate) struct ForwardBackwardSequenceAwareSyncCursor { forward: ForwardSequenceAwareSyncCursor, backward: BackwardSequenceAwareSyncCursor, last_direction: SyncDirection, + metrics: ForwardBackwardCursorMetrics, + domain: HyperlaneDomain, } -impl ForwardBackwardSequenceAwareSyncCursor { +impl + ForwardBackwardSequenceAwareSyncCursor +{ /// Construct a new contract sync helper. pub async fn new( + domain: &HyperlaneDomain, + metrics: ForwardBackwardCursorMetrics, latest_sequence_querier: Arc>, store: Arc>, chunk_size: u32, @@ -97,12 +107,48 @@ impl ForwardBackwardSequenceAwareSyncCursor { forward: forward_cursor, backward: backward_cursor, last_direction: SyncDirection::Forward, + metrics, + domain: domain.to_owned(), }) } + + async fn update_metrics(&self) { + let (cursor_type, latest_block, sequence) = match self.last_direction { + SyncDirection::Forward => ( + "ForwardSequenced", + self.forward.latest_queried_block(), + self.forward.last_sequence(), + ), + SyncDirection::Backward => ( + "BackwardSequenced", + self.backward.latest_queried_block(), + self.backward.last_sequence(), + ), + }; + + let chain_name = self.domain.name(); + let label_values = &[T::name(), chain_name, cursor_type]; + + self.metrics + .cursor_current_block + .with_label_values(label_values) + .set(latest_block as i64); + + self.metrics + .cursor_current_sequence + .with_label_values(label_values) + .set(sequence as i64); + + let max_sequence = self.forward.target_sequence().await as i64; + self.metrics + .cursor_max_sequence + .with_label_values(&[T::name(), chain_name]) + .set(max_sequence); + } } #[async_trait] -impl ContractSyncCursor +impl ContractSyncCursor for ForwardBackwardSequenceAwareSyncCursor { async fn next_action(&mut self) -> Result<(CursorAction, Duration)> { @@ -131,6 +177,7 @@ impl ContractSyncCursor logs: Vec<(Indexed, LogMeta)>, range: RangeInclusive, ) -> Result<()> { + self.update_metrics().await; match self.last_direction { SyncDirection::Forward => self.forward.update(logs, range).await, SyncDirection::Backward => self.backward.update(logs, range).await, diff --git a/rust/main/hyperlane-base/src/contract_sync/metrics.rs b/rust/main/hyperlane-base/src/contract_sync/metrics.rs index 30f3fd02bf..ed680a73df 100644 --- a/rust/main/hyperlane-base/src/contract_sync/metrics.rs +++ b/rust/main/hyperlane-base/src/contract_sync/metrics.rs @@ -1,6 +1,9 @@ -use crate::CoreMetrics; use prometheus::{IntCounterVec, IntGaugeVec}; +use crate::CoreMetrics; + +use super::sequence_aware::ForwardBackwardCursorMetrics; + /// Struct encapsulating prometheus metrics used by the ContractSync. #[derive(Debug, Clone)] pub struct ContractSyncMetrics { @@ -20,6 +23,9 @@ pub struct ContractSyncMetrics { /// See `last_known_message_nonce` in CoreMetrics. pub message_nonce: IntGaugeVec, + + /// Metrics for forward and backward cursors. + pub forward_backward_cursor_metrics: ForwardBackwardCursorMetrics, } impl ContractSyncMetrics { @@ -43,10 +49,13 @@ impl ContractSyncMetrics { let message_nonce = metrics.last_known_message_nonce(); + let forward_backward_cursor_metrics = ForwardBackwardCursorMetrics::new(metrics); + ContractSyncMetrics { indexed_height, stored_events, message_nonce, + forward_backward_cursor_metrics, } } } diff --git a/rust/main/hyperlane-base/src/contract_sync/mod.rs b/rust/main/hyperlane-base/src/contract_sync/mod.rs index df9563d8a7..1fcf72a47a 100644 --- a/rust/main/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/mod.rs @@ -352,6 +352,8 @@ where ) -> Result>> { Ok(Box::new( ForwardBackwardSequenceAwareSyncCursor::new( + self.domain(), + self.metrics.forward_backward_cursor_metrics.clone(), self.indexer.clone(), Arc::new(self.store.clone()), index_settings.chunk_size, From baaf0f67417c958c3e2c4e09a300a3c1afd62b4a Mon Sep 17 00:00:00 2001 From: "Mantas M." Date: Wed, 27 Nov 2024 19:53:03 +0100 Subject: [PATCH 2/7] feat(cursor-metrics): make metrics generic and usable by both cursor types --- .../cursors/{sequence_aware => }/metrics.rs | 20 ++-- .../src/contract_sync/cursors/mod.rs | 15 ++- .../src/contract_sync/cursors/rate_limited.rs | 109 ++++++++++++++---- .../cursors/sequence_aware/mod.rs | 15 ++- .../src/contract_sync/metrics.rs | 13 ++- .../hyperlane-base/src/contract_sync/mod.rs | 4 +- 6 files changed, 127 insertions(+), 49 deletions(-) rename rust/main/hyperlane-base/src/contract_sync/cursors/{sequence_aware => }/metrics.rs (74%) diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/metrics.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs similarity index 74% rename from rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/metrics.rs rename to rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs index d2fc4f1db1..6475ab2c82 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/metrics.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs @@ -1,32 +1,36 @@ -use crate::CoreMetrics; use prometheus::IntGaugeVec; -/// Struct encapsulating prometheus metrics used by the ForwardBackwardCursorMetrics. +use crate::CoreMetrics; + +/// Struct encapsulating prometheus metrics used by SequenceAware and RateLimited cursors. #[derive(Debug, Clone)] -pub struct ForwardBackwardCursorMetrics { +pub struct CursorMetrics { /// Current block of the cursor. + /// Used by both sequence aware and rate limited cursors. /// Labels: /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. /// - `chain`: Chain the cursor is collecting data from. - /// - `cursor_type`: The type of cursor. E.g. `ForwardSequenced` or `BackwardSequenced`. + /// - `cursor_type`: The type of cursor. E.g. `forward_sequenced`, `backward_sequenced`, `forward_rate_limited` or `backward_rate_limited`. pub cursor_current_block: IntGaugeVec, /// Current sequence of the cursor. + /// Only used by sequence aware cursors. /// Labels: /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. /// - `chain`: Chain the cursor is collecting data from. - /// - `cursor_type`: The type of cursor. E.g. `ForwardSequenced` or `BackwardSequenced`. + /// - `cursor_type`: The type of cursor. E.g. `forward_sequenced`, `backward_sequenced`, `forward_rate_limited` or `backward_rate_limited`. pub cursor_current_sequence: IntGaugeVec, /// Max sequence of the cursor. + /// Only used by sequence aware cursors. /// Labels: /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. /// - `chain`: Chain the cursor is collecting data from. pub cursor_max_sequence: IntGaugeVec, } -impl ForwardBackwardCursorMetrics { - /// Instantiate a new ForwardBackwardCursorMetrics object. +impl CursorMetrics { + /// Instantiate a new CursorMetrics object. pub fn new(metrics: &CoreMetrics) -> Self { let cursor_current_block = metrics .new_int_gauge( @@ -52,7 +56,7 @@ impl ForwardBackwardCursorMetrics { ) .expect("failed to register cursor_max_sequence metric"); - ForwardBackwardCursorMetrics { + CursorMetrics { cursor_current_block, cursor_current_sequence, cursor_max_sequence, diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs index 0846119195..2fb36453e4 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs @@ -1,13 +1,16 @@ -pub(crate) mod sequence_aware; - use hyperlane_core::{ Delivery, HyperlaneDomainProtocol, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, }; + +pub(crate) mod sequence_aware; pub(crate) use sequence_aware::ForwardBackwardSequenceAwareSyncCursor; pub(crate) mod rate_limited; pub(crate) use rate_limited::RateLimitedContractSyncCursor; +pub(crate) mod metrics; +pub(crate) use metrics::CursorMetrics; + pub enum CursorType { SequenceAware, RateLimited, @@ -44,7 +47,7 @@ impl Indexable for HyperlaneMessage { } fn name() -> &'static str { - "HyperlaneMessage" + "hyperlane_message" } } @@ -59,7 +62,7 @@ impl Indexable for InterchainGasPayment { } fn name() -> &'static str { - "InterchainGasPayment" + "interchain_gas_payment" } } @@ -74,7 +77,7 @@ impl Indexable for MerkleTreeInsertion { } fn name() -> &'static str { - "MerkleTreeInsertion" + "merkle_tree_insertion" } } @@ -89,6 +92,6 @@ impl Indexable for Delivery { } fn name() -> &'static str { - "Delivery" + "delivery" } } diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs index 9428d6bfd4..a654c13bad 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs @@ -8,12 +8,16 @@ use std::{ use async_trait::async_trait; use derive_new::new; use eyre::Result; + use hyperlane_core::{ - ContractSyncCursor, CursorAction, HyperlaneWatermarkedLogStore, Indexed, Indexer, LogMeta, + ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneWatermarkedLogStore, Indexed, + Indexer, LogMeta, }; use crate::contract_sync::eta_calculator::SyncerEtaCalculator; +use super::{CursorMetrics, Indexable}; + /// Time window for the moving average used in the eta calculator in seconds. const ETA_TIME_WINDOW: f64 = 2. * 60.; @@ -83,12 +87,16 @@ pub(crate) struct RateLimitedContractSyncCursor { last_tip_update: Instant, eta_calculator: SyncerEtaCalculator, sync_state: SyncState, + metrics: Arc, + domain: HyperlaneDomain, } -impl RateLimitedContractSyncCursor { +impl RateLimitedContractSyncCursor { /// Construct a new contract sync helper. pub async fn new( indexer: Arc>, + metrics: Arc, + domain: &HyperlaneDomain, store: Arc>, chunk_size: u32, initial_height: u32, @@ -107,6 +115,8 @@ impl RateLimitedContractSyncCursor { // The rate limited cursor currently only syncs in the forward direction. SyncDirection::Forward, ), + metrics, + domain: domain.to_owned(), }) } @@ -155,12 +165,24 @@ impl RateLimitedContractSyncCursor { Duration::from_secs(0) } } + + async fn update_metrics(&self) { + let latest_block = self.latest_queried_block(); + let chain_name = self.domain.name(); + // The rate limited cursor currently only syncs in the forward direction. + let label_values = &[T::name(), chain_name, "forward_rate_limited"]; + + self.metrics + .cursor_current_block + .with_label_values(label_values) + .set(latest_block as i64); + } } #[async_trait] impl ContractSyncCursor for RateLimitedContractSyncCursor where - T: Send + Sync + Debug + 'static, + T: Send + Sync + Debug + 'static + Indexable, { async fn next_action(&mut self) -> Result<(CursorAction, Duration)> { let eta = self.sync_eta(); @@ -187,6 +209,7 @@ where _: Vec<(Indexed, LogMeta)>, range: RangeInclusive, ) -> Result<()> { + self.update_metrics().await; // Store a relatively conservative view of the high watermark, which should allow a single watermark to be // safely shared across multiple cursors, so long as they are running sufficiently in sync self.store @@ -216,12 +239,13 @@ where } } -impl Debug for RateLimitedContractSyncCursor { +impl Debug for RateLimitedContractSyncCursor { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RateLimitedContractSyncCursor") .field("tip", &self.tip) .field("last_tip_update", &self.last_tip_update) .field("sync_state", &self.sync_state) + .field("domain", &self.domain) .finish() } } @@ -229,50 +253,92 @@ impl Debug for RateLimitedContractSyncCursor { #[cfg(test)] pub(crate) mod test { use super::*; - use hyperlane_core::{ChainResult, HyperlaneLogStore}; + use crate::cursors::CursorType; + use hyperlane_core::{ChainResult, HyperlaneDomainProtocol, HyperlaneLogStore}; use mockall::{self, Sequence}; const CHUNK_SIZE: u32 = 10; const INITIAL_HEIGHT: u32 = 0; + #[derive(Debug, Clone)] + struct MockIndexable; + + unsafe impl Sync for MockIndexable {} + unsafe impl Send for MockIndexable {} + + impl Indexable for MockIndexable { + fn indexing_cursor(_domain: HyperlaneDomainProtocol) -> CursorType { + CursorType::RateLimited + } + + fn name() -> &'static str { + "mock_indexable" + } + } + mockall::mock! { - pub Indexer {} + pub Indexer {} - impl Debug for Indexer { + impl Debug for Indexer { fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result; } #[async_trait] - impl Indexer<()> for Indexer { - async fn fetch_logs_in_range(&self, range: RangeInclusive) -> ChainResult , LogMeta)>>; + impl Indexer for Indexer { + async fn fetch_logs_in_range(&self, range: RangeInclusive) -> ChainResult, LogMeta)>>; async fn get_finalized_block_number(&self) -> ChainResult; } } mockall::mock! { - pub Db {} + pub Db {} - impl Debug for Db { + impl Debug for Db { fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result; } #[async_trait] - impl HyperlaneLogStore<()> for Db { - async fn store_logs(&self, logs: &[(hyperlane_core::Indexed<()> , LogMeta)]) -> Result; + impl HyperlaneLogStore for Db { + async fn store_logs(&self, logs: &[(hyperlane_core::Indexed, LogMeta)]) -> Result; } #[async_trait] - impl HyperlaneWatermarkedLogStore<()> for Db { + impl HyperlaneWatermarkedLogStore for Db { async fn retrieve_high_watermark(&self) -> Result>; async fn store_high_watermark(&self, block_number: u32) -> Result<()>; } } - async fn mock_rate_limited_cursor( + fn mock_cursor_metrics() -> CursorMetrics { + CursorMetrics { + cursor_current_block: prometheus::IntGaugeVec::new( + prometheus::Opts::new("cursor_current_block", "Current block of the cursor") + .namespace("mock") + .subsystem("cursor"), + &["event_type", "chain", "cursor_type"], + ) + .unwrap(), + cursor_current_sequence: prometheus::IntGaugeVec::new( + prometheus::Opts::new("cursor_current_sequence", "Current sequence of the cursor") + .namespace("mock") + .subsystem("cursor"), + &["event_type", "chain", "cursor_type"], + ) + .unwrap(), + cursor_max_sequence: prometheus::IntGaugeVec::new( + prometheus::Opts::new("cursor_max_sequence", "Max sequence of the cursor") + .namespace("mock") + .subsystem("cursor"), + &["event_type", "chain"], + ) + .unwrap(), + } + } + async fn mock_rate_limited_cursor( custom_chain_tips: Option>, - ) -> RateLimitedContractSyncCursor<()> { + ) -> RateLimitedContractSyncCursor { let mut seq = Sequence::new(); - let mut indexer = MockIndexer::new(); + let mut indexer = MockIndexer::::new(); match custom_chain_tips { Some(chain_tips) => { for tip in chain_tips { @@ -294,11 +360,14 @@ pub(crate) mod test { } let mut db = MockDb::new(); + let metrics = mock_cursor_metrics(); db.expect_store_high_watermark().returning(|_| Ok(())); let chunk_size = CHUNK_SIZE; let initial_height = INITIAL_HEIGHT; RateLimitedContractSyncCursor::new( Arc::new(indexer), + Arc::new(metrics), + &HyperlaneDomain::new_test_domain("test"), Arc::new(db), chunk_size, initial_height, @@ -309,7 +378,7 @@ pub(crate) mod test { #[tokio::test] async fn test_next_action_retries_if_update_isnt_called() { - let mut cursor = mock_rate_limited_cursor(None).await; + let mut cursor = mock_rate_limited_cursor::(None).await; let (action_1, _) = cursor.next_action().await.unwrap(); let (_action_2, _) = cursor.next_action().await.unwrap(); @@ -319,7 +388,7 @@ pub(crate) mod test { #[tokio::test] async fn test_next_action_changes_if_update_is_called() { - let mut cursor = mock_rate_limited_cursor(None).await; + let mut cursor = mock_rate_limited_cursor::(None).await; let (action_1, _) = cursor.next_action().await.unwrap(); let range = match action_1 { @@ -336,7 +405,7 @@ pub(crate) mod test { #[tokio::test] async fn test_next_action_sleeps_if_tip_is_not_updated() { let chain_tips = vec![10]; - let mut cursor = mock_rate_limited_cursor(Some(chain_tips)).await; + let mut cursor = mock_rate_limited_cursor::(Some(chain_tips)).await; let (action, _) = cursor.next_action().await.unwrap(); assert!(matches!(action, CursorAction::Sleep(_))); } diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs index c524cf525f..899a0be781 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs @@ -1,22 +1,21 @@ +use std::ops::RangeInclusive; use std::{fmt::Debug, sync::Arc, time::Duration}; use async_trait::async_trait; use eyre::Result; + use hyperlane_core::{ ChainCommunicationError, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneSequenceAwareIndexerStoreReader, IndexMode, Indexed, LogMeta, SequenceAwareIndexer, }; -use std::ops::RangeInclusive; mod backward; mod forward; -mod metrics; pub(crate) use backward::BackwardSequenceAwareSyncCursor; pub(crate) use forward::ForwardSequenceAwareSyncCursor; -pub(crate) use metrics::ForwardBackwardCursorMetrics; -use super::Indexable; +use super::{CursorMetrics, Indexable}; #[derive(Debug, Clone, PartialEq, Eq)] struct LastIndexedSnapshot { @@ -71,7 +70,7 @@ pub(crate) struct ForwardBackwardSequenceAwareSyncCursor { forward: ForwardSequenceAwareSyncCursor, backward: BackwardSequenceAwareSyncCursor, last_direction: SyncDirection, - metrics: ForwardBackwardCursorMetrics, + metrics: Arc, domain: HyperlaneDomain, } @@ -81,7 +80,7 @@ impl /// Construct a new contract sync helper. pub async fn new( domain: &HyperlaneDomain, - metrics: ForwardBackwardCursorMetrics, + metrics: Arc, latest_sequence_querier: Arc>, store: Arc>, chunk_size: u32, @@ -115,12 +114,12 @@ impl async fn update_metrics(&self) { let (cursor_type, latest_block, sequence) = match self.last_direction { SyncDirection::Forward => ( - "ForwardSequenced", + "forward_sequenced", self.forward.latest_queried_block(), self.forward.last_sequence(), ), SyncDirection::Backward => ( - "BackwardSequenced", + "backward_sequenced", self.backward.latest_queried_block(), self.backward.last_sequence(), ), diff --git a/rust/main/hyperlane-base/src/contract_sync/metrics.rs b/rust/main/hyperlane-base/src/contract_sync/metrics.rs index ed680a73df..54e3c7a2cb 100644 --- a/rust/main/hyperlane-base/src/contract_sync/metrics.rs +++ b/rust/main/hyperlane-base/src/contract_sync/metrics.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; + use prometheus::{IntCounterVec, IntGaugeVec}; use crate::CoreMetrics; -use super::sequence_aware::ForwardBackwardCursorMetrics; +use super::cursors::CursorMetrics; /// Struct encapsulating prometheus metrics used by the ContractSync. #[derive(Debug, Clone)] @@ -24,8 +26,8 @@ pub struct ContractSyncMetrics { /// See `last_known_message_nonce` in CoreMetrics. pub message_nonce: IntGaugeVec, - /// Metrics for forward and backward cursors. - pub forward_backward_cursor_metrics: ForwardBackwardCursorMetrics, + /// Metrics for SequenceAware and RateLimited cursors. + pub cursor_metrics: Arc, } impl ContractSyncMetrics { @@ -48,14 +50,13 @@ impl ContractSyncMetrics { .expect("failed to register stored_events metric"); let message_nonce = metrics.last_known_message_nonce(); - - let forward_backward_cursor_metrics = ForwardBackwardCursorMetrics::new(metrics); + let cursor_metrics = Arc::new(CursorMetrics::new(metrics)); ContractSyncMetrics { indexed_height, stored_events, message_nonce, - forward_backward_cursor_metrics, + cursor_metrics, } } } diff --git a/rust/main/hyperlane-base/src/contract_sync/mod.rs b/rust/main/hyperlane-base/src/contract_sync/mod.rs index 1fcf72a47a..12a49de0bd 100644 --- a/rust/main/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/mod.rs @@ -312,6 +312,8 @@ where Ok(Box::new( RateLimitedContractSyncCursor::new( Arc::new(self.indexer.clone()), + Arc::clone(&self.metrics.cursor_metrics), + self.domain(), self.store.clone(), index_settings.chunk_size, index_settings.from, @@ -353,7 +355,7 @@ where Ok(Box::new( ForwardBackwardSequenceAwareSyncCursor::new( self.domain(), - self.metrics.forward_backward_cursor_metrics.clone(), + Arc::clone(&self.metrics.cursor_metrics), self.indexer.clone(), Arc::new(self.store.clone()), index_settings.chunk_size, From 23af6397d8436e7a4c3bab7b317919c4f6124787 Mon Sep 17 00:00:00 2001 From: "Mantas M." Date: Wed, 27 Nov 2024 20:20:17 +0100 Subject: [PATCH 3/7] chore(cursor-metrics): clean arc clone syntax --- rust/main/hyperlane-base/src/contract_sync/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/main/hyperlane-base/src/contract_sync/mod.rs b/rust/main/hyperlane-base/src/contract_sync/mod.rs index 12a49de0bd..c9048b4808 100644 --- a/rust/main/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/mod.rs @@ -312,7 +312,7 @@ where Ok(Box::new( RateLimitedContractSyncCursor::new( Arc::new(self.indexer.clone()), - Arc::clone(&self.metrics.cursor_metrics), + self.metrics.cursor_metrics.clone(), self.domain(), self.store.clone(), index_settings.chunk_size, @@ -355,7 +355,7 @@ where Ok(Box::new( ForwardBackwardSequenceAwareSyncCursor::new( self.domain(), - Arc::clone(&self.metrics.cursor_metrics), + self.metrics.cursor_metrics.clone(), self.indexer.clone(), Arc::new(self.store.clone()), index_settings.chunk_size, From 5d883824fa990b0dd2b81166e459754be57614e0 Mon Sep 17 00:00:00 2001 From: Mantas-M <120508669+Mantas-M@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:31:30 +0100 Subject: [PATCH 4/7] Update rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs --- rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs index 6475ab2c82..adc1564b9d 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs @@ -10,7 +10,7 @@ pub struct CursorMetrics { /// Labels: /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. /// - `chain`: Chain the cursor is collecting data from. - /// - `cursor_type`: The type of cursor. E.g. `forward_sequenced`, `backward_sequenced`, `forward_rate_limited` or `backward_rate_limited`. + /// - `cursor_type`: The type of cursor. E.g. `forward_sequenced`, `backward_sequenced`, `forward_rate_limited`. pub cursor_current_block: IntGaugeVec, /// Current sequence of the cursor. From adaeecee6e150b8580dee386101552c7f8a6a5d2 Mon Sep 17 00:00:00 2001 From: Mantas-M <120508669+Mantas-M@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:32:00 +0100 Subject: [PATCH 5/7] Update rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs --- rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs index adc1564b9d..8918da99f9 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs @@ -18,7 +18,7 @@ pub struct CursorMetrics { /// Labels: /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. /// - `chain`: Chain the cursor is collecting data from. - /// - `cursor_type`: The type of cursor. E.g. `forward_sequenced`, `backward_sequenced`, `forward_rate_limited` or `backward_rate_limited`. + /// - `cursor_type`: The type of cursor. E.g. `forward_sequenced`, `backward_sequenced`, `forward_rate_limited`. pub cursor_current_sequence: IntGaugeVec, /// Max sequence of the cursor. From f98afce661b297dfd2ec8e2a67bc12c4984c6914 Mon Sep 17 00:00:00 2001 From: Mantas-M <120508669+Mantas-M@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:32:51 +0100 Subject: [PATCH 6/7] Update rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs Co-authored-by: Danil Nemirovsky --- .../hyperlane-base/src/contract_sync/cursors/rate_limited.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs index a654c13bad..88c5784824 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs @@ -182,7 +182,7 @@ impl RateLimitedContractSyncCursor #[async_trait] impl ContractSyncCursor for RateLimitedContractSyncCursor where - T: Send + Sync + Debug + 'static + Indexable, + T: Indexable + Send + Sync + Debug + 'static, { async fn next_action(&mut self) -> Result<(CursorAction, Duration)> { let eta = self.sync_eta(); From 652d9b70fa85c0d48f5292c1f41e490db9ffea27 Mon Sep 17 00:00:00 2001 From: "Mantas M." Date: Mon, 16 Dec 2024 14:02:07 +0100 Subject: [PATCH 7/7] fix(cursor-metrics): store metrics on individual cursors for seq aware cursors --- .../cursors/sequence_aware/backward.rs | 63 ++++++++-- .../cursors/sequence_aware/forward.rs | 118 ++++++++++++++++-- .../cursors/sequence_aware/mod.rs | 61 +++------ 3 files changed, 180 insertions(+), 62 deletions(-) diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs index 866ae61fc7..d7c9396ca9 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs @@ -4,15 +4,19 @@ use std::{collections::HashSet, fmt::Debug, ops::RangeInclusive, sync::Arc, time use async_trait::async_trait; use eyre::Result; -use hyperlane_core::{ - indexed_to_sequence_indexed_array, ContractSyncCursor, CursorAction, - HyperlaneSequenceAwareIndexerStoreReader, IndexMode, Indexed, LogMeta, SequenceIndexed, -}; use itertools::Itertools; +use maplit::hashmap; use tokio::time::sleep; use tracing::{debug, instrument, warn}; -use super::{LastIndexedSnapshot, TargetSnapshot}; +use hyperlane_core::{ + indexed_to_sequence_indexed_array, ContractSyncCursor, CursorAction, HyperlaneDomain, + HyperlaneSequenceAwareIndexerStoreReader, IndexMode, Indexed, LogMeta, SequenceIndexed, +}; + +use crate::cursors::Indexable; + +use super::{CursorMetrics, LastIndexedSnapshot, MetricsData, TargetSnapshot}; const MAX_BACKWARD_SYNC_BLOCKING_TIME: Duration = Duration::from_secs(5); @@ -33,6 +37,10 @@ pub(crate) struct BackwardSequenceAwareSyncCursor { current_indexing_snapshot: Option, /// The mode of indexing to use. index_mode: IndexMode, + /// The domain of the cursor. + domain: HyperlaneDomain, + /// Cursor metrics. + metrics: Arc, } impl Debug for BackwardSequenceAwareSyncCursor { @@ -42,13 +50,14 @@ impl Debug for BackwardSequenceAwareSyncCursor { .field("last_indexed_snapshot", &self.last_indexed_snapshot) .field("current_indexing_snapshot", &self.current_indexing_snapshot) .field("index_mode", &self.index_mode) + .field("domain", &self.domain) .finish() } } -impl BackwardSequenceAwareSyncCursor { +impl BackwardSequenceAwareSyncCursor { #[instrument( - skip(store), + skip(store, metrics_data), fields(chunk_size, next_sequence, start_block, index_mode), ret )] @@ -58,6 +67,7 @@ impl BackwardSequenceAwareSyncCursor { current_sequence_count: u32, start_block: u32, index_mode: IndexMode, + metrics_data: MetricsData, ) -> Self { // If the current sequence count is 0, we haven't indexed anything yet. // Otherwise, consider the current sequence count as the last indexed snapshot, @@ -66,6 +76,7 @@ impl BackwardSequenceAwareSyncCursor { sequence: (current_sequence_count > 0).then_some(current_sequence_count), at_block: start_block, }; + let MetricsData { domain, metrics } = metrics_data; Self { chunk_size, @@ -73,6 +84,8 @@ impl BackwardSequenceAwareSyncCursor { current_indexing_snapshot: last_indexed_snapshot.previous_target(), last_indexed_snapshot, index_mode, + domain, + metrics, } } @@ -330,10 +343,31 @@ impl BackwardSequenceAwareSyncCursor { fn rewind(&mut self) { self.current_indexing_snapshot = self.last_indexed_snapshot.previous_target(); } + + /// Updates the cursor metrics. + async fn update_metrics(&self) { + let labels = hashmap! { + "event_type" => T::name(), + "chain" => self.domain.name(), + "cursor_type" => "backward_sequenced", + }; + + let latest_block = self.latest_queried_block(); + self.metrics + .cursor_current_block + .with(&labels) + .set(latest_block as i64); + + let sequence = self.last_sequence(); + self.metrics + .cursor_current_sequence + .with(&labels) + .set(sequence as i64); + } } #[async_trait] -impl ContractSyncCursor +impl ContractSyncCursor for BackwardSequenceAwareSyncCursor { async fn next_action(&mut self) -> Result<(CursorAction, Duration)> { @@ -369,6 +403,7 @@ impl ContractSyncCursor logs: Vec<(Indexed, LogMeta)>, range: RangeInclusive, ) -> Result<()> { + self.update_metrics().await; let Some(current_indexing_snapshot) = self.current_indexing_snapshot.clone() else { // We're synced, no need to update at all. return Ok(()); @@ -405,6 +440,8 @@ impl ContractSyncCursor #[cfg(test)] mod test { + use hyperlane_core::HyperlaneDomain; + use super::super::forward::test::*; use super::*; @@ -440,12 +477,17 @@ mod test { ], }); + let metrics_data = MetricsData { + domain: HyperlaneDomain::new_test_domain("test"), + metrics: Arc::new(mock_cursor_metrics()), + }; let mut cursor = BackwardSequenceAwareSyncCursor::new( chunk_size, db, INITIAL_SEQUENCE_COUNT, INITIAL_START_BLOCK, mode, + metrics_data, ); // Skip any already indexed logs and sanity check we start at the correct spot. @@ -772,12 +814,17 @@ mod test { .collect(), }); + let metrics_data = MetricsData { + domain: HyperlaneDomain::new_test_domain("test"), + metrics: Arc::new(mock_cursor_metrics()), + }; let mut cursor = BackwardSequenceAwareSyncCursor::new( CHUNK_SIZE, db, INITIAL_SEQUENCE_COUNT, INITIAL_START_BLOCK, INDEX_MODE, + metrics_data, ); // We're fully synced, so expect no range diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs index 0aa8620f1c..79917dd663 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs @@ -7,15 +7,19 @@ use std::{ use async_trait::async_trait; use eyre::Result; +use itertools::Itertools; +use maplit::hashmap; +use tracing::{debug, instrument, warn}; + use hyperlane_core::{ - indexed_to_sequence_indexed_array, ContractSyncCursor, CursorAction, + indexed_to_sequence_indexed_array, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneSequenceAwareIndexerStoreReader, IndexMode, Indexed, LogMeta, SequenceAwareIndexer, SequenceIndexed, }; -use itertools::Itertools; -use tracing::{debug, instrument, warn}; -use super::{LastIndexedSnapshot, TargetSnapshot}; +use crate::cursors::Indexable; + +use super::{CursorMetrics, LastIndexedSnapshot, MetricsData, TargetSnapshot}; /// A sequence-aware cursor that syncs forwards in perpetuity. pub(crate) struct ForwardSequenceAwareSyncCursor { @@ -39,6 +43,10 @@ pub(crate) struct ForwardSequenceAwareSyncCursor { target_snapshot: Option, /// The mode of indexing. index_mode: IndexMode, + /// The domain the cursor is indexing. + domain: HyperlaneDomain, + /// Cursor metrics. + metrics: Arc, } impl Debug for ForwardSequenceAwareSyncCursor { @@ -49,13 +57,14 @@ impl Debug for ForwardSequenceAwareSyncCursor { .field("current_indexing_snapshot", &self.current_indexing_snapshot) .field("target_snapshot", &self.target_snapshot) .field("index_mode", &self.index_mode) + .field("domain", &self.domain) .finish() } } -impl ForwardSequenceAwareSyncCursor { +impl ForwardSequenceAwareSyncCursor { #[instrument( - skip(store, latest_sequence_querier), + skip(store, latest_sequence_querier, metrics_data), fields(chunk_size, next_sequence, start_block, index_mode), ret )] @@ -66,6 +75,7 @@ impl ForwardSequenceAwareSyncCursor { next_sequence: u32, start_block: u32, index_mode: IndexMode, + metrics_data: MetricsData, ) -> Self { // If the next sequence is 0, we're starting from the beginning and haven't // indexed anything yet. @@ -73,6 +83,7 @@ impl ForwardSequenceAwareSyncCursor { sequence: (next_sequence > 0).then(|| next_sequence.saturating_sub(1)), at_block: start_block, }; + let MetricsData { domain, metrics } = metrics_data; Self { chunk_size, @@ -85,6 +96,8 @@ impl ForwardSequenceAwareSyncCursor { }, target_snapshot: None, index_mode, + domain, + metrics, } } @@ -417,10 +430,38 @@ impl ForwardSequenceAwareSyncCursor { fn rewind(&mut self) { self.current_indexing_snapshot = self.last_indexed_snapshot.next_target(); } + + // Updates the cursor metrics. + async fn update_metrics(&self) { + let mut labels = hashmap! { + "event_type" => T::name(), + "chain" => self.domain.name(), + "cursor_type" => "forward_sequenced", + }; + + let latest_block = self.latest_queried_block(); + self.metrics + .cursor_current_block + .with(&labels) + .set(latest_block as i64); + + let sequence = self.last_sequence(); + self.metrics + .cursor_current_sequence + .with(&labels) + .set(sequence as i64); + + labels.remove("cursor_type"); + let max_sequence = self.target_sequence().await as i64; + self.metrics + .cursor_max_sequence + .with(&labels) + .set(max_sequence); + } } #[async_trait] -impl ContractSyncCursor +impl ContractSyncCursor for ForwardSequenceAwareSyncCursor { async fn next_action(&mut self) -> Result<(CursorAction, Duration)> { @@ -460,6 +501,7 @@ impl ContractSyncCursor logs: Vec<(Indexed, LogMeta)>, range: RangeInclusive, ) -> Result<()> { + self.update_metrics().await; // Remove any sequence duplicates, filter out any logs preceding our current snapshot, // and sort in ascending order. let logs = indexed_to_sequence_indexed_array(logs)? @@ -485,7 +527,11 @@ impl ContractSyncCursor #[cfg(test)] pub(crate) mod test { use derive_new::new; - use hyperlane_core::{ChainResult, HyperlaneLogStore, Indexed, Indexer, Sequenced}; + use hyperlane_core::{ + ChainResult, HyperlaneDomainProtocol, HyperlaneLogStore, Indexed, Indexer, Sequenced, + }; + + use crate::cursors::CursorType; use super::*; @@ -498,7 +544,7 @@ pub(crate) mod test { #[async_trait] impl SequenceAwareIndexer for MockLatestSequenceQuerier where - T: Sequenced + Debug, + T: Sequenced + Debug + Clone + Send + Sync + Indexable + 'static, { async fn latest_sequence_count_and_tip(&self) -> ChainResult<(Option, u32)> { Ok((self.latest_sequence_count, self.tip)) @@ -508,7 +554,7 @@ pub(crate) mod test { #[async_trait] impl Indexer for MockLatestSequenceQuerier where - T: Sequenced + Debug, + T: Sequenced + Debug + Clone + Send + Sync + Indexable + 'static, { async fn fetch_logs_in_range( &self, @@ -528,15 +574,17 @@ pub(crate) mod test { } #[async_trait] - impl HyperlaneLogStore for MockHyperlaneSequenceAwareIndexerStore { + impl HyperlaneLogStore + for MockHyperlaneSequenceAwareIndexerStore + { async fn store_logs(&self, logs: &[(Indexed, LogMeta)]) -> eyre::Result { Ok(logs.len() as u32) } } #[async_trait] - impl HyperlaneSequenceAwareIndexerStoreReader - for MockHyperlaneSequenceAwareIndexerStore + impl + HyperlaneSequenceAwareIndexerStoreReader for MockHyperlaneSequenceAwareIndexerStore { async fn retrieve_by_sequence(&self, sequence: u32) -> eyre::Result> { Ok(self @@ -563,6 +611,9 @@ pub(crate) mod test { pub sequence: u32, } + unsafe impl Sync for MockSequencedData {} + unsafe impl Send for MockSequencedData {} + impl From for Indexed { fn from(val: MockSequencedData) -> Self { let sequence = val.sequence; @@ -570,12 +621,48 @@ pub(crate) mod test { } } + impl Indexable for MockSequencedData { + fn indexing_cursor(_domain: HyperlaneDomainProtocol) -> CursorType { + CursorType::SequenceAware + } + + fn name() -> &'static str { + "mock_indexable" + } + } + impl Sequenced for MockSequencedData { fn sequence(&self) -> Option { Some(self.sequence) } } + pub fn mock_cursor_metrics() -> CursorMetrics { + CursorMetrics { + cursor_current_block: prometheus::IntGaugeVec::new( + prometheus::Opts::new("cursor_current_block", "Current block of the cursor") + .namespace("mock") + .subsystem("cursor"), + &["event_type", "chain", "cursor_type"], + ) + .unwrap(), + cursor_current_sequence: prometheus::IntGaugeVec::new( + prometheus::Opts::new("cursor_current_sequence", "Current sequence of the cursor") + .namespace("mock") + .subsystem("cursor"), + &["event_type", "chain", "cursor_type"], + ) + .unwrap(), + cursor_max_sequence: prometheus::IntGaugeVec::new( + prometheus::Opts::new("cursor_max_sequence", "Max sequence of the cursor") + .namespace("mock") + .subsystem("cursor"), + &["event_type", "chain"], + ) + .unwrap(), + } + } + pub fn log_meta_with_block(block_number: u64) -> LogMeta { LogMeta { address: Default::default(), @@ -619,6 +706,10 @@ pub(crate) mod test { ], }); + let metrics_data = MetricsData { + domain: HyperlaneDomain::new_test_domain("test"), + metrics: Arc::new(mock_cursor_metrics()), + }; let mut cursor = ForwardSequenceAwareSyncCursor::new( chunk_size, latest_sequence_querier, @@ -627,6 +718,7 @@ pub(crate) mod test { 3, 70, mode, + metrics_data, ); // Skip any already indexed logs and sanity check we start at the correct spot. diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs index 899a0be781..1e48686379 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs @@ -29,6 +29,13 @@ struct LastIndexedSnapshot { pub at_block: u32, } +/// Used to avoid going over the `instrument` macro limit. +#[derive(Debug, Clone)] +struct MetricsData { + pub domain: HyperlaneDomain, + pub metrics: Arc, +} + impl LastIndexedSnapshot { fn next_target(&self) -> TargetSnapshot { TargetSnapshot { @@ -70,8 +77,6 @@ pub(crate) struct ForwardBackwardSequenceAwareSyncCursor { forward: ForwardSequenceAwareSyncCursor, backward: BackwardSequenceAwareSyncCursor, last_direction: SyncDirection, - metrics: Arc, - domain: HyperlaneDomain, } impl @@ -92,6 +97,10 @@ impl let sequence_count = sequence_count.ok_or(ChainCommunicationError::from_other_str( "Failed to query sequence", ))?; + let metrics_data = MetricsData { + domain: domain.to_owned(), + metrics, + }; let forward_cursor = ForwardSequenceAwareSyncCursor::new( chunk_size, latest_sequence_querier.clone(), @@ -99,51 +108,22 @@ impl sequence_count, tip, mode, + metrics_data.clone(), + ); + let backward_cursor = BackwardSequenceAwareSyncCursor::new( + chunk_size, + store, + sequence_count, + tip, + mode, + metrics_data, ); - let backward_cursor = - BackwardSequenceAwareSyncCursor::new(chunk_size, store, sequence_count, tip, mode); Ok(Self { forward: forward_cursor, backward: backward_cursor, last_direction: SyncDirection::Forward, - metrics, - domain: domain.to_owned(), }) } - - async fn update_metrics(&self) { - let (cursor_type, latest_block, sequence) = match self.last_direction { - SyncDirection::Forward => ( - "forward_sequenced", - self.forward.latest_queried_block(), - self.forward.last_sequence(), - ), - SyncDirection::Backward => ( - "backward_sequenced", - self.backward.latest_queried_block(), - self.backward.last_sequence(), - ), - }; - - let chain_name = self.domain.name(); - let label_values = &[T::name(), chain_name, cursor_type]; - - self.metrics - .cursor_current_block - .with_label_values(label_values) - .set(latest_block as i64); - - self.metrics - .cursor_current_sequence - .with_label_values(label_values) - .set(sequence as i64); - - let max_sequence = self.forward.target_sequence().await as i64; - self.metrics - .cursor_max_sequence - .with_label_values(&[T::name(), chain_name]) - .set(max_sequence); - } } #[async_trait] @@ -176,7 +156,6 @@ impl ContractSyncCursor logs: Vec<(Indexed, LogMeta)>, range: RangeInclusive, ) -> Result<()> { - self.update_metrics().await; match self.last_direction { SyncDirection::Forward => self.forward.update(logs, range).await, SyncDirection::Backward => self.backward.update(logs, range).await,