From d70f62df6354f40a3a133f7ac6d9583dc7fd6252 Mon Sep 17 00:00:00 2001 From: longbowlu Date: Mon, 24 Jun 2024 22:34:21 -0700 Subject: [PATCH 1/3] query transactions --- Cargo.lock | 3 + crates/sui-bridge-indexer/Cargo.toml | 3 + crates/sui-bridge-indexer/src/config.rs | 1 + crates/sui-bridge-indexer/src/lib.rs | 6 + crates/sui-bridge-indexer/src/main.rs | 91 ++++++++-- .../up.sql | 6 + crates/sui-bridge-indexer/src/models.rs | 9 +- .../src/postgres_manager.rs | 32 ++++ crates/sui-bridge-indexer/src/schema.rs | 16 +- .../src/sui_transaction_handler.rs | 168 ++++++++++++++++++ .../src/sui_transaction_queries.rs | 46 +++++ 11 files changed, 366 insertions(+), 15 deletions(-) create mode 100644 crates/sui-bridge-indexer/src/sui_transaction_handler.rs create mode 100644 crates/sui-bridge-indexer/src/sui_transaction_queries.rs diff --git a/Cargo.lock b/Cargo.lock index 05d77b770fee5..d0b6b0bcaf654 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12186,6 +12186,7 @@ dependencies = [ "clap", "diesel", "ethers", + "futures", "hex-literal 0.3.4", "mysten-metrics", "prometheus", @@ -12194,6 +12195,8 @@ dependencies = [ "sui-bridge", "sui-config", "sui-data-ingestion-core", + "sui-json-rpc-types", + "sui-sdk 1.29.0", "sui-test-transaction-builder", "sui-types", "telemetry-subscribers", diff --git a/crates/sui-bridge-indexer/Cargo.toml b/crates/sui-bridge-indexer/Cargo.toml index b346a0cf5d94c..9603b1f464342 100644 --- a/crates/sui-bridge-indexer/Cargo.toml +++ b/crates/sui-bridge-indexer/Cargo.toml @@ -12,6 +12,7 @@ diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] } ethers = "2.0" tokio = { workspace = true, features = ["full"] } anyhow.workspace = true +futures.workspace = true async-trait.workspace = true bcs.workspace = true bin-version.workspace = true @@ -20,6 +21,8 @@ mysten-metrics.workspace = true prometheus.workspace = true serde_yaml.workspace = true sui-bridge.workspace = true +sui-sdk.workspace = true +sui-json-rpc-types.workspace = true sui-data-ingestion-core.workspace = true sui-types.workspace = true telemetry-subscribers.workspace = true diff --git a/crates/sui-bridge-indexer/src/config.rs b/crates/sui-bridge-indexer/src/config.rs index f1c5746d6db97..ee4c75ac554a0 100644 --- a/crates/sui-bridge-indexer/src/config.rs +++ b/crates/sui-bridge-indexer/src/config.rs @@ -18,6 +18,7 @@ pub struct Config { pub start_block: u64, pub metric_url: String, pub metric_port: u16, + pub sui_rpc_url: Option, } /// Load the config to run. diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index d8d3b11c5e2ee..8fa9a7585d895 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -12,8 +12,11 @@ pub mod metrics; pub mod models; pub mod postgres_manager; pub mod schema; +pub mod sui_transaction_handler; +pub mod sui_transaction_queries; pub mod sui_worker; +#[derive(Clone)] pub struct TokenTransfer { chain_id: u8, nonce: u64, @@ -27,6 +30,7 @@ pub struct TokenTransfer { data: Option, } +#[derive(Clone)] pub struct TokenTransferData { sender_address: Vec, destination_chain: u8, @@ -66,6 +70,7 @@ impl TokenTransfer { } } +#[derive(Clone)] pub(crate) enum TokenTransferStatus { DepositedUnfinalized, Deposited, @@ -85,6 +90,7 @@ impl Display for TokenTransferStatus { } } +#[derive(Clone)] enum BridgeDataSource { Sui, Eth, diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 977a69bd583fe..6c4dc9b9c9751 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -3,8 +3,10 @@ use anyhow::Result; use clap::*; +use mysten_metrics::spawn_logged_monitored_task; use mysten_metrics::start_prometheus_server; use prometheus::Registry; +use tokio::task::JoinHandle; use std::collections::{HashMap, HashSet}; use std::env; use std::path::PathBuf; @@ -12,10 +14,13 @@ use std::sync::Arc; use sui_bridge::eth_client::EthClient; use sui_bridge::metrics::BridgeMetrics; use sui_bridge_indexer::eth_worker::EthBridgeWorker; -use sui_bridge_indexer::postgres_manager::{get_connection_pool, PgProgressStore}; +use sui_bridge_indexer::postgres_manager::{read_sui_progress_store, get_connection_pool, PgProgressStore}; +use sui_bridge_indexer::sui_transaction_handler::handle_sui_transcations_loop; +use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task; use sui_bridge_indexer::sui_worker::SuiBridgeWorker; use sui_bridge_indexer::{config::load_config, metrics::BridgeIndexerMetrics}; use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool}; +use sui_sdk::SuiClientBuilder; use sui_types::messages_checkpoint::CheckpointSequenceNumber; use tokio::sync::oneshot; @@ -86,18 +91,26 @@ async fn main() -> Result<()> { .map_err(|e| anyhow::anyhow!(e.to_string()))?, ); - let unfinalized_handle = eth_worker.start_indexing_unfinalized_events(eth_client.clone()); - let finalized_handle = eth_worker.start_indexing_finalized_events(eth_client.clone()); - + let unfinalized_handle = eth_worker.start_indexing_unfinalized_events(eth_client.clone()).await.unwrap(); + let finalized_handle = eth_worker.start_indexing_finalized_events(eth_client.clone()).await.unwrap(); + let handles = vec![unfinalized_handle, finalized_handle]; // TODO: add retry_with_max_elapsed_time - let progress = start_processing_sui_checkpoints( - &config_clone, - db_url, - indexer_meterics, - ingestion_metrics, - ); + if let Some(sui_rpc_url) = config.sui_rpc_url.clone() { + start_processing_sui_checkpoints_by_querying_txes( + sui_rpc_url, + db_url.clone(), + indexer_meterics.clone(), + ).await.unwrap(); + } else { + let _ = start_processing_sui_checkpoints( + &config_clone, + db_url, + indexer_meterics, + ingestion_metrics, + ).await; + } - let _ = tokio::try_join!(finalized_handle, unfinalized_handle, progress); + let _ = futures::future::join_all(handles).await; Ok(()) } @@ -137,3 +150,59 @@ async fn start_processing_sui_checkpoints( ) .await } + +async fn start_processing_sui_checkpoints_by_querying_txes( + sui_rpc_url: String, + db_url: String, + indexer_meterics: BridgeIndexerMetrics, +) -> Result>> { + // metrics init + + let pg_pool = get_connection_pool(db_url.clone()); + let (tx, rx) = mysten_metrics::metered_channel::channel( + 100, + &mysten_metrics::get_metrics() + .unwrap() + .channel_inflight + .with_label_values(&["sui_transaction_processing_queue"]), + ); + let mut handles = vec![]; + // FIXME cursor + let cursor = read_sui_progress_store(&pg_pool).unwrap(); + let sui_client = SuiClientBuilder::default().build(sui_rpc_url).await?; + handles.push(spawn_logged_monitored_task!( + start_sui_tx_polling_task(sui_client, cursor, tx), + "start_sui_tx_polling_task" + )); + handles.push(spawn_logged_monitored_task!( + handle_sui_transcations_loop(pg_pool.clone(), rx, indexer_meterics.clone(),), + "handle_sui_transcations_loop" + )); + Ok(handles) + // let (_exit_sender, exit_receiver) = oneshot::channel(); + + // let progress_store = PgProgressStore::new(pg_pool, config.bridge_genesis_checkpoint); + // let mut executor = IndexerExecutor::new( + // progress_store, + // 1, /* workflow types */ + // ingestion_metrics, + // ); + + // let indexer_metrics_cloned = indexer_meterics.clone(); + + // let worker_pool = WorkerPool::new( + // SuiBridgeWorker::new(vec![], db_url, indexer_metrics_cloned), + // "bridge worker".into(), + // config.concurrency as usize, + // ); + // executor.register(worker_pool).await?; + // executor + // .run( + // config.checkpoints_path.clone().into(), + // Some(config.remote_store_url.clone()), + // vec![], // optional remote store access options + // ReaderOptions::default(), + // exit_receiver, + // ) + // .await +} diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql index 93e5762bfd07e..a477593fb6d0b 100644 --- a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql @@ -39,3 +39,9 @@ CREATE TABLE progress_store task_name TEXT PRIMARY KEY, checkpoint BIGINT NOT NULL ); + +CREATE TABLE sui_progress_store +( + id INT PRIMARY KEY, + txn_digest bytea NOT NULL +); diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs index e3bd537be945f..56aeecea4f925 100644 --- a/crates/sui-bridge-indexer/src/models.rs +++ b/crates/sui-bridge-indexer/src/models.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::schema::{progress_store, token_transfer, token_transfer_data}; +use crate::schema::{progress_store, token_transfer, token_transfer_data, sui_progress_store}; use diesel::{Identifiable, Insertable, Queryable, Selectable}; #[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] @@ -11,6 +11,13 @@ pub struct ProgressStore { pub checkpoint: i64, } +#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] +#[diesel(table_name = sui_progress_store, primary_key(txn_digest))] +pub struct SuiProgressStore { + pub id: i32, // Dummy value + pub txn_digest: Vec, +} + #[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] #[diesel(table_name = token_transfer, primary_key(chain_id, nonce))] pub struct TokenTransfer { diff --git a/crates/sui-bridge-indexer/src/postgres_manager.rs b/crates/sui-bridge-indexer/src/postgres_manager.rs index 73035a048ff06..e92cea97fef9c 100644 --- a/crates/sui-bridge-indexer/src/postgres_manager.rs +++ b/crates/sui-bridge-indexer/src/postgres_manager.rs @@ -2,10 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::models::ProgressStore as DBProgressStore; +use crate::models::SuiProgressStore; use crate::models::TokenTransfer as DBTokenTransfer; use crate::models::TokenTransferData as DBTokenTransferData; use crate::schema::progress_store::checkpoint; use crate::schema::progress_store::dsl::progress_store; +use crate::schema::sui_progress_store::txn_digest; use crate::schema::token_transfer_data; use crate::{schema, schema::token_transfer, TokenTransfer}; use async_trait::async_trait; @@ -17,10 +19,13 @@ use diesel::{ Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper, }; use sui_data_ingestion_core::ProgressStore; +use sui_types::digests::TransactionDigest; use sui_types::messages_checkpoint::CheckpointSequenceNumber; pub(crate) type PgPool = Pool>; +const SUI_PROGRESS_STORE_DUMMY_KEY: i32 = 1; + pub fn get_connection_pool(database_url: String) -> PgPool { let manager = ConnectionManager::::new(database_url); Pool::builder() @@ -52,6 +57,33 @@ pub fn write(pool: &PgPool, token_txns: Vec) -> Result<(), anyhow Ok(()) } +pub fn update_sui_progress_store(pool: &PgPool, tx_digest: TransactionDigest) -> Result<(), anyhow::Error> { + let mut conn = pool.get()?; + diesel::insert_into(schema::sui_progress_store::table) + .values(&SuiProgressStore { + id: SUI_PROGRESS_STORE_DUMMY_KEY, + txn_digest: tx_digest.inner().to_vec(), + }) + .on_conflict(schema::sui_progress_store::dsl::id) + .do_update() + .set(txn_digest.eq(tx_digest.inner().to_vec())) + .execute(&mut conn)?; + Ok(()) +} + +pub fn read_sui_progress_store(pool: &PgPool) -> anyhow::Result> { + let mut conn = pool.get()?; + let val: Option = crate::schema::sui_progress_store::dsl::sui_progress_store + .select(SuiProgressStore::as_select()) + .first(&mut conn) + .optional()?; + match val { + Some(val) => Ok(Some(TransactionDigest::try_from(val.txn_digest.as_slice())?)), + None => Ok(None), + } +} + + pub fn get_latest_eth_token_transfer( pool: &PgPool, finalized: bool, diff --git a/crates/sui-bridge-indexer/src/schema.rs b/crates/sui-bridge-indexer/src/schema.rs index 58e1aef7a0002..1952393d948fc 100644 --- a/crates/sui-bridge-indexer/src/schema.rs +++ b/crates/sui-bridge-indexer/src/schema.rs @@ -1,5 +1,3 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 // @generated automatically by Diesel CLI. diesel::table! { @@ -9,6 +7,13 @@ diesel::table! { } } +diesel::table! { + sui_progress_store (id) { + id -> Int4, + txn_digest -> Bytea, + } +} + diesel::table! { token_transfer (chain_id, nonce, status) { chain_id -> Int4, @@ -38,4 +43,9 @@ diesel::table! { } } -diesel::allow_tables_to_appear_in_same_query!(progress_store, token_transfer, token_transfer_data,); +diesel::allow_tables_to_appear_in_same_query!( + progress_store, + sui_progress_store, + token_transfer, + token_transfer_data, +); diff --git a/crates/sui-bridge-indexer/src/sui_transaction_handler.rs b/crates/sui-bridge-indexer/src/sui_transaction_handler.rs new file mode 100644 index 0000000000000..f123234344f8e --- /dev/null +++ b/crates/sui-bridge-indexer/src/sui_transaction_handler.rs @@ -0,0 +1,168 @@ +use crate::postgres_manager::{update_sui_progress_store, write, PgPool, PgProgressStore}; +use crate::metrics::BridgeIndexerMetrics; +use crate::{BridgeDataSource, TokenTransfer, TokenTransferData, TokenTransferStatus}; +use anyhow::Result; +use futures::StreamExt; +use sui_types::digests::TransactionDigest; + +use std::time::Duration; +use sui_bridge::events::{ + MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed, +}; + +use sui_json_rpc_types::{ + SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse, +}; + +use sui_types::BRIDGE_ADDRESS; +use tracing::{error, info}; + +pub(crate) const COMMIT_BATCH_SIZE: usize = 10; + +pub async fn handle_sui_transcations_loop( + pg_pool: PgPool, + rx: mysten_metrics::metered_channel::Receiver<(Vec, Option)>, + metrics: BridgeIndexerMetrics, +) { + let checkpoint_commit_batch_size = std::env::var("COMMIT_BATCH_SIZE") + .unwrap_or(COMMIT_BATCH_SIZE.to_string()) + .parse::() + .unwrap(); + let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(rx) + .ready_chunks(checkpoint_commit_batch_size); + while let Some(batch) = stream.next().await { + // unwrap: batch must not be empty + let cursor = batch.last().unwrap().1.clone(); + let token_transfers = batch.into_iter().map( + // TODO: letting it panic so we can capture errors, but we should handle this more gracefully + |(chunk, _)| process_transctions(chunk, &metrics).unwrap() + ).flatten().collect::>(); + // for (chunk, _) in batch { + // let token_transfers = process_transctions(resp, &metrics).unwrap(); + if !token_transfers.is_empty() { + while let Err(err) = write(&pg_pool, token_transfers.clone()) { + error!("Failed to write sui transactions to DB: {:?}", err); + tokio::time::sleep(Duration::from_secs(5)).await; + } + info!("Wrote {} token transfers to DB", token_transfers.len()); + } + // } + if let Some(cursor) = cursor { + while let Err(err) = update_sui_progress_store(&pg_pool, cursor.clone()) { + error!("Failed to update sui progress tore DB: {:?}", err); + tokio::time::sleep(Duration::from_secs(5)).await; + } + info!("Updated sui transaction cursor to {}", cursor); + } + } + unreachable!("Channel closed unexpectedly"); +} + +fn process_transctions( + resp: Vec, + metrics: &BridgeIndexerMetrics, +) -> Result> { + resp.into_iter() + .map(|r| into_token_transfers(r, metrics)) + .collect::>>() + .map(|v| v.into_iter().flatten().collect()) +} + +pub fn into_token_transfers( + resp: SuiTransactionBlockResponse, + metrics: &BridgeIndexerMetrics, +) -> Result> { + let mut transfers = Vec::new(); + let tx_digest = resp.digest; + let events = resp.events.ok_or(anyhow::anyhow!( + "Expected events in SuiTransactionBlockResponse: {:?}", + tx_digest + ))?; + let checkpoint_num = resp.checkpoint.ok_or(anyhow::anyhow!( + "Expected checkpoint in SuiTransactionBlockResponse: {:?}", + tx_digest + ))?; + let timestamp_ms = resp.timestamp_ms.ok_or(anyhow::anyhow!( + "Expected timestamp_ms in SuiTransactionBlockResponse: {:?}", + tx_digest + ))?; + let effects = resp.effects.ok_or(anyhow::anyhow!( + "Expected effects in SuiTransactionBlockResponse: {:?}", + tx_digest + ))?; + for ev in events.data { + if ev.type_.address != BRIDGE_ADDRESS { + continue; + } + match ev.type_.name.as_str() { + "TokenDepositedEvent" => { + info!("Observed Sui Deposit {:?}", ev); + metrics.total_sui_token_deposited.inc(); + let move_event: MoveTokenDepositedEvent = bcs::from_bytes(&ev.bcs)?; + transfers.push(TokenTransfer { + chain_id: move_event.source_chain, + nonce: move_event.seq_num, + block_height: checkpoint_num, + timestamp_ms, + txn_hash: tx_digest.inner().to_vec(), + txn_sender: ev.sender.to_vec(), + status: TokenTransferStatus::Deposited, + gas_usage: effects.gas_cost_summary().net_gas_usage(), + data_source: BridgeDataSource::Sui, + data: Some(TokenTransferData { + destination_chain: move_event.target_chain, + sender_address: move_event.sender_address.clone(), + recipient_address: move_event.target_address.clone(), + token_id: move_event.token_type, + amount: move_event.amount_sui_adjusted, + }), + }); + } + "TokenTransferApproved" => { + info!("Observed Sui Approval {:?}", ev); + metrics.total_sui_token_transfer_approved.inc(); + let event: MoveTokenTransferApproved = bcs::from_bytes(&ev.bcs)?; + transfers.push(TokenTransfer { + chain_id: event.message_key.source_chain, + nonce: event.message_key.bridge_seq_num, + block_height: checkpoint_num, + timestamp_ms, + txn_hash: tx_digest.inner().to_vec(), + txn_sender: ev.sender.to_vec(), + status: TokenTransferStatus::Approved, + gas_usage: effects.gas_cost_summary().net_gas_usage(), + data_source: BridgeDataSource::Sui, + data: None, + }); + } + "TokenTransferClaimed" => { + info!("Observed Sui Claim {:?}", ev); + metrics.total_sui_token_transfer_claimed.inc(); + let event: MoveTokenTransferClaimed = bcs::from_bytes(&ev.bcs)?; + transfers.push(TokenTransfer { + chain_id: event.message_key.source_chain, + nonce: event.message_key.bridge_seq_num, + block_height: checkpoint_num, + timestamp_ms, + txn_hash: tx_digest.inner().to_vec(), + txn_sender: ev.sender.to_vec(), + status: TokenTransferStatus::Claimed, + gas_usage: effects.gas_cost_summary().net_gas_usage(), + data_source: BridgeDataSource::Sui, + data: None, + }); + } + _ => { + metrics.total_sui_bridge_txn_other.inc(); + } + } + } + if !transfers.is_empty() { + info!( + ?tx_digest, + "SUI: Extracted {} bridge token transfer data entries", + transfers.len(), + ); + } + Ok(transfers) +} diff --git a/crates/sui-bridge-indexer/src/sui_transaction_queries.rs b/crates/sui-bridge-indexer/src/sui_transaction_queries.rs new file mode 100644 index 0000000000000..268e59baee5a3 --- /dev/null +++ b/crates/sui-bridge-indexer/src/sui_transaction_queries.rs @@ -0,0 +1,46 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; +use sui_json_rpc_types::SuiTransactionBlockResponseOptions; +use sui_json_rpc_types::TransactionFilter; +use sui_json_rpc_types::{SuiTransactionBlockResponse, SuiTransactionBlockResponseQuery}; +use sui_sdk::{SuiClient, SuiClientBuilder}; +use sui_types::digests::TransactionDigest; +use sui_types::SUI_BRIDGE_OBJECT_ID; + +use sui_bridge::retry_with_max_elapsed_time; +use tracing::{error, info}; + +const QUERY_DURATION: Duration = Duration::from_millis(500); + +pub async fn start_sui_tx_polling_task( + sui_client: SuiClient, + mut cursor: Option, + tx: mysten_metrics::metered_channel::Sender<(Vec, Option)>, +) { + info!("Starting SUI transaction polling task from {:?}", cursor); + loop { + let Ok(Ok(results)) = retry_with_max_elapsed_time!( + sui_client.read_api().query_transaction_blocks( + SuiTransactionBlockResponseQuery { + filter: Some(TransactionFilter::InputObject(SUI_BRIDGE_OBJECT_ID)), + options: Some(SuiTransactionBlockResponseOptions::full_content()), + }, + cursor, + None, + false, + ), + Duration::from_secs(600) + ) else { + error!("Failed to query bridge transactions after retry"); + continue; + }; + info!("Retrieved {} bridge transactions", results.data.len()); + tx.send((results.data, results.next_cursor)) + .await + .expect("Failed to send transaction block to process"); + cursor = results.next_cursor; + // tokio::time::sleep(QUERY_DURATION).await; + } +} From 9557fc29fb7bf77a323d33c8b987cfe34d050398 Mon Sep 17 00:00:00 2001 From: longbowlu Date: Tue, 25 Jun 2024 16:05:58 -0700 Subject: [PATCH 2/3] clean up and metrics --- Cargo.lock | 1 + crates/sui-bridge-indexer/Cargo.toml | 1 + crates/sui-bridge-indexer/src/eth_worker.rs | 18 ++++- .../src/latest_eth_syncer.rs | 13 +++- crates/sui-bridge-indexer/src/main.rs | 75 ++++++++----------- crates/sui-bridge-indexer/src/metrics.rs | 33 +++++++- .../up.sql | 2 +- crates/sui-bridge-indexer/src/models.rs | 2 +- .../src/postgres_manager.rs | 13 +++- crates/sui-bridge-indexer/src/schema.rs | 2 + .../src/sui_transaction_handler.rs | 47 +++++++----- .../src/sui_transaction_queries.rs | 15 +++- crates/sui-bridge-indexer/src/sui_worker.rs | 8 +- crates/sui-bridge/src/eth_syncer.rs | 24 +++++- crates/sui-bridge/src/metrics.rs | 23 +++++- crates/sui-bridge/src/node.rs | 2 +- 16 files changed, 196 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0b6b0bcaf654..2424b43829281 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12199,6 +12199,7 @@ dependencies = [ "sui-sdk 1.29.0", "sui-test-transaction-builder", "sui-types", + "tap", "telemetry-subscribers", "test-cluster", "tokio", diff --git a/crates/sui-bridge-indexer/Cargo.toml b/crates/sui-bridge-indexer/Cargo.toml index 9603b1f464342..75e75c39a93e3 100644 --- a/crates/sui-bridge-indexer/Cargo.toml +++ b/crates/sui-bridge-indexer/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] serde.workspace = true +tap.workspace = true diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] } ethers = "2.0" tokio = { workspace = true, features = ["full"] } diff --git a/crates/sui-bridge-indexer/src/eth_worker.rs b/crates/sui-bridge-indexer/src/eth_worker.rs index 9f68114ba4a12..bfb0e9ef88d38 100644 --- a/crates/sui-bridge-indexer/src/eth_worker.rs +++ b/crates/sui-bridge-indexer/src/eth_worker.rs @@ -16,16 +16,18 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use sui_bridge::abi::{EthBridgeEvent, EthSuiBridgeEvents}; +use sui_bridge::metrics::BridgeMetrics; use sui_bridge::types::EthLog; use sui_bridge::{eth_client::EthClient, eth_syncer::EthSyncer}; use tokio::task::JoinHandle; use tracing::info; use tracing::log::error; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct EthBridgeWorker { provider: Arc>, pg_pool: PgPool, + bridge_metrics: Arc, metrics: BridgeIndexerMetrics, bridge_address: EthAddress, config: Config, @@ -34,6 +36,7 @@ pub struct EthBridgeWorker { impl EthBridgeWorker { pub fn new( pg_pool: PgPool, + bridge_metrics: Arc, metrics: BridgeIndexerMetrics, config: Config, ) -> Result> { @@ -47,6 +50,7 @@ impl EthBridgeWorker { Ok(Self { provider, pg_pool, + bridge_metrics, metrics, bridge_address, config, @@ -69,7 +73,7 @@ impl EthBridgeWorker { let (_task_handles, eth_events_rx, _) = EthSyncer::new(eth_client, finalized_contract_addresses) - .run() + .run(self.bridge_metrics.clone()) .await .map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?; @@ -114,7 +118,7 @@ impl EthBridgeWorker { self.provider.clone(), unfinalized_contract_addresses.clone(), ) - .run() + .run(self.metrics.clone()) .await .map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?; @@ -146,6 +150,11 @@ async fn process_eth_events( mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec)>, finalized: bool, ) { + let progress_gauge = if finalized { + metrics.last_committed_eth_block.clone() + } else { + metrics.last_committed_unfinalized_eth_block.clone() + }; while let Some((_, _, logs)) = eth_events_rx.recv().await { for log in logs.iter() { let eth_bridge_event = EthBridgeEvent::try_from_eth_log(log); @@ -240,8 +249,11 @@ async fn process_eth_events( } }; + // TODO: we either scream here or keep retrying this until we succeed if let Err(e) = write(&pg_pool, vec![transfer]) { error!("Error writing token transfer to database: {:?}", e); + } else { + progress_gauge.set(block_number as i64); } } } diff --git a/crates/sui-bridge-indexer/src/latest_eth_syncer.rs b/crates/sui-bridge-indexer/src/latest_eth_syncer.rs index abfdc203f0938..1cd17494ad8bc 100644 --- a/crates/sui-bridge-indexer/src/latest_eth_syncer.rs +++ b/crates/sui-bridge-indexer/src/latest_eth_syncer.rs @@ -19,6 +19,8 @@ use tokio::task::JoinHandle; use tokio::time::{self, Duration}; use tracing::error; +use crate::metrics::BridgeIndexerMetrics; + const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000; const ETH_EVENTS_CHANNEL_SIZE: usize = 1000; const BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(2); @@ -51,6 +53,7 @@ where pub async fn run( self, + metrics: BridgeIndexerMetrics, ) -> BridgeResult<( Vec>, mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec)>, @@ -66,9 +69,9 @@ where let mut task_handles = vec![]; for (contract_address, start_block) in self.contract_addresses { let eth_events_tx_clone = eth_evnets_tx.clone(); - // let latest_block_rx_clone = latest_block_rx.clone(); let eth_client_clone = self.eth_client.clone(); let provider_clone = self.provider.clone(); + let metrics_clone = metrics.clone(); task_handles.push(spawn_logged_monitored_task!( Self::run_event_listening_task( contract_address, @@ -76,6 +79,7 @@ where provider_clone, eth_events_tx_clone, eth_client_clone, + metrics_clone, ) )); } @@ -88,6 +92,7 @@ where provider: Arc>, events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec)>, eth_client: Arc>, + metrics: BridgeIndexerMetrics, ) { tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}"); loop { @@ -124,6 +129,7 @@ where continue; }; let len = events.len(); + let last_block = events.last().map(|e| e.block_number); // Note 1: we always events to the channel even when it is empty. This is because of // how `eth_getLogs` api is designed - we want cursor to move forward continuously. @@ -143,6 +149,11 @@ where "Observed {len} new Eth events", ); } + if let Some(last_block) = last_block { + metrics + .last_synced_unfinalized_eth_block + .set(last_block as i64); + } start_block = end_block + 1; } } diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 6c4dc9b9c9751..8c096c957aed8 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -6,7 +6,6 @@ use clap::*; use mysten_metrics::spawn_logged_monitored_task; use mysten_metrics::start_prometheus_server; use prometheus::Registry; -use tokio::task::JoinHandle; use std::collections::{HashMap, HashSet}; use std::env; use std::path::PathBuf; @@ -14,7 +13,9 @@ use std::sync::Arc; use sui_bridge::eth_client::EthClient; use sui_bridge::metrics::BridgeMetrics; use sui_bridge_indexer::eth_worker::EthBridgeWorker; -use sui_bridge_indexer::postgres_manager::{read_sui_progress_store, get_connection_pool, PgProgressStore}; +use sui_bridge_indexer::postgres_manager::{ + get_connection_pool, read_sui_progress_store, PgProgressStore, +}; use sui_bridge_indexer::sui_transaction_handler::handle_sui_transcations_loop; use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task; use sui_bridge_indexer::sui_worker::SuiBridgeWorker; @@ -22,6 +23,7 @@ use sui_bridge_indexer::{config::load_config, metrics::BridgeIndexerMetrics}; use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool}; use sui_sdk::SuiClientBuilder; use sui_types::messages_checkpoint::CheckpointSequenceNumber; +use tokio::task::JoinHandle; use tokio::sync::oneshot; use tracing::info; @@ -76,6 +78,7 @@ async fn main() -> Result<()> { // TODO: retry_with_max_elapsed_time let eth_worker = EthBridgeWorker::new( get_connection_pool(db_url.clone()), + bridge_metrics.clone(), indexer_meterics.clone(), config.clone(), ) @@ -85,31 +88,42 @@ async fn main() -> Result<()> { EthClient::::new( &config.eth_rpc_url, HashSet::from_iter(vec![eth_worker.bridge_address()]), - bridge_metrics, + bridge_metrics.clone(), ) .await .map_err(|e| anyhow::anyhow!(e.to_string()))?, ); - let unfinalized_handle = eth_worker.start_indexing_unfinalized_events(eth_client.clone()).await.unwrap(); - let finalized_handle = eth_worker.start_indexing_finalized_events(eth_client.clone()).await.unwrap(); + let unfinalized_handle = eth_worker + .start_indexing_unfinalized_events(eth_client.clone()) + .await + .unwrap(); + let finalized_handle = eth_worker + .start_indexing_finalized_events(eth_client.clone()) + .await + .unwrap(); let handles = vec![unfinalized_handle, finalized_handle]; - // TODO: add retry_with_max_elapsed_time + if let Some(sui_rpc_url) = config.sui_rpc_url.clone() { start_processing_sui_checkpoints_by_querying_txes( sui_rpc_url, db_url.clone(), indexer_meterics.clone(), - ).await.unwrap(); + bridge_metrics, + ) + .await + .unwrap(); } else { let _ = start_processing_sui_checkpoints( &config_clone, db_url, indexer_meterics, ingestion_metrics, - ).await; + ) + .await; } + // We are not waiting for the sui tasks to finish here, which is ok. let _ = futures::future::join_all(handles).await; Ok(()) @@ -154,55 +168,28 @@ async fn start_processing_sui_checkpoints( async fn start_processing_sui_checkpoints_by_querying_txes( sui_rpc_url: String, db_url: String, - indexer_meterics: BridgeIndexerMetrics, + indexer_metrics: BridgeIndexerMetrics, + bridge_metrics: Arc, ) -> Result>> { - // metrics init - let pg_pool = get_connection_pool(db_url.clone()); let (tx, rx) = mysten_metrics::metered_channel::channel( 100, &mysten_metrics::get_metrics() - .unwrap() - .channel_inflight - .with_label_values(&["sui_transaction_processing_queue"]), + .unwrap() + .channel_inflight + .with_label_values(&["sui_transaction_processing_queue"]), ); let mut handles = vec![]; - // FIXME cursor - let cursor = read_sui_progress_store(&pg_pool).unwrap(); + let cursor = + read_sui_progress_store(&pg_pool).expect("Failed to read cursor from sui progress store"); let sui_client = SuiClientBuilder::default().build(sui_rpc_url).await?; handles.push(spawn_logged_monitored_task!( - start_sui_tx_polling_task(sui_client, cursor, tx), + start_sui_tx_polling_task(sui_client, cursor, tx, bridge_metrics), "start_sui_tx_polling_task" )); handles.push(spawn_logged_monitored_task!( - handle_sui_transcations_loop(pg_pool.clone(), rx, indexer_meterics.clone(),), + handle_sui_transcations_loop(pg_pool.clone(), rx, indexer_metrics.clone()), "handle_sui_transcations_loop" )); Ok(handles) - // let (_exit_sender, exit_receiver) = oneshot::channel(); - - // let progress_store = PgProgressStore::new(pg_pool, config.bridge_genesis_checkpoint); - // let mut executor = IndexerExecutor::new( - // progress_store, - // 1, /* workflow types */ - // ingestion_metrics, - // ); - - // let indexer_metrics_cloned = indexer_meterics.clone(); - - // let worker_pool = WorkerPool::new( - // SuiBridgeWorker::new(vec![], db_url, indexer_metrics_cloned), - // "bridge worker".into(), - // config.concurrency as usize, - // ); - // executor.register(worker_pool).await?; - // executor - // .run( - // config.checkpoints_path.clone().into(), - // Some(config.remote_store_url.clone()), - // vec![], // optional remote store access options - // ReaderOptions::default(), - // exit_receiver, - // ) - // .await } diff --git a/crates/sui-bridge-indexer/src/metrics.rs b/crates/sui-bridge-indexer/src/metrics.rs index 2a3f965344237..f4ccf5dfbedfc 100644 --- a/crates/sui-bridge-indexer/src/metrics.rs +++ b/crates/sui-bridge-indexer/src/metrics.rs @@ -1,7 +1,10 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use prometheus::{register_int_counter_with_registry, IntCounter, Registry}; +use prometheus::{ + register_int_counter_with_registry, register_int_gauge_with_registry, IntCounter, IntGauge, + Registry, +}; #[derive(Clone, Debug)] pub struct BridgeIndexerMetrics { @@ -14,6 +17,10 @@ pub struct BridgeIndexerMetrics { pub(crate) total_eth_token_deposited: IntCounter, pub(crate) total_eth_token_transfer_claimed: IntCounter, pub(crate) total_eth_bridge_txn_other: IntCounter, + pub(crate) last_committed_sui_checkpoint: IntGauge, + pub(crate) last_committed_eth_block: IntGauge, + pub(crate) last_synced_unfinalized_eth_block: IntGauge, + pub(crate) last_committed_unfinalized_eth_block: IntGauge, } impl BridgeIndexerMetrics { @@ -73,6 +80,30 @@ impl BridgeIndexerMetrics { registry, ) .unwrap(), + last_committed_sui_checkpoint: register_int_gauge_with_registry!( + "last_committed_sui_checkpoint", + "The latest sui checkpoint that indexer committed to DB", + registry, + ) + .unwrap(), + last_committed_eth_block: register_int_gauge_with_registry!( + "last_committed_eth_block", + "The latest eth block that indexer committed to DB", + registry, + ) + .unwrap(), + last_synced_unfinalized_eth_block: register_int_gauge_with_registry!( + "last_synced_unfinalized_eth_block", + "The latest unfinalized block that indexer synced", + registry, + ) + .unwrap(), + last_committed_unfinalized_eth_block: register_int_gauge_with_registry!( + "last_committed_unfinalized_eth_block", + "The latest unfinalized block that indexer comitted to DB", + registry, + ) + .unwrap(), } } diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql index a477593fb6d0b..fdf75ae1ed615 100644 --- a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql @@ -42,6 +42,6 @@ CREATE TABLE progress_store CREATE TABLE sui_progress_store ( - id INT PRIMARY KEY, + id INT PRIMARY KEY, -- dummy value txn_digest bytea NOT NULL ); diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs index 56aeecea4f925..a00c6d1bf29ca 100644 --- a/crates/sui-bridge-indexer/src/models.rs +++ b/crates/sui-bridge-indexer/src/models.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::schema::{progress_store, token_transfer, token_transfer_data, sui_progress_store}; +use crate::schema::{progress_store, sui_progress_store, token_transfer, token_transfer_data}; use diesel::{Identifiable, Insertable, Queryable, Selectable}; #[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] diff --git a/crates/sui-bridge-indexer/src/postgres_manager.rs b/crates/sui-bridge-indexer/src/postgres_manager.rs index e92cea97fef9c..27f1917930a9a 100644 --- a/crates/sui-bridge-indexer/src/postgres_manager.rs +++ b/crates/sui-bridge-indexer/src/postgres_manager.rs @@ -36,6 +36,9 @@ pub fn get_connection_pool(database_url: String) -> PgPool { // TODO: add retry logic pub fn write(pool: &PgPool, token_txns: Vec) -> Result<(), anyhow::Error> { + if token_txns.is_empty() { + return Ok(()); + } let (transfers, data): (Vec, Vec>) = token_txns .iter() .map(|t| (t.to_db(), t.to_data_maybe())) @@ -57,7 +60,10 @@ pub fn write(pool: &PgPool, token_txns: Vec) -> Result<(), anyhow Ok(()) } -pub fn update_sui_progress_store(pool: &PgPool, tx_digest: TransactionDigest) -> Result<(), anyhow::Error> { +pub fn update_sui_progress_store( + pool: &PgPool, + tx_digest: TransactionDigest, +) -> Result<(), anyhow::Error> { let mut conn = pool.get()?; diesel::insert_into(schema::sui_progress_store::table) .values(&SuiProgressStore { @@ -78,12 +84,13 @@ pub fn read_sui_progress_store(pool: &PgPool) -> anyhow::Result Ok(Some(TransactionDigest::try_from(val.txn_digest.as_slice())?)), + Some(val) => Ok(Some(TransactionDigest::try_from( + val.txn_digest.as_slice(), + )?)), None => Ok(None), } } - pub fn get_latest_eth_token_transfer( pool: &PgPool, finalized: bool, diff --git a/crates/sui-bridge-indexer/src/schema.rs b/crates/sui-bridge-indexer/src/schema.rs index 1952393d948fc..df912443473c1 100644 --- a/crates/sui-bridge-indexer/src/schema.rs +++ b/crates/sui-bridge-indexer/src/schema.rs @@ -1,3 +1,5 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 // @generated automatically by Diesel CLI. diesel::table! { diff --git a/crates/sui-bridge-indexer/src/sui_transaction_handler.rs b/crates/sui-bridge-indexer/src/sui_transaction_handler.rs index f123234344f8e..4adb248c0f093 100644 --- a/crates/sui-bridge-indexer/src/sui_transaction_handler.rs +++ b/crates/sui-bridge-indexer/src/sui_transaction_handler.rs @@ -1,5 +1,8 @@ -use crate::postgres_manager::{update_sui_progress_store, write, PgPool, PgProgressStore}; +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + use crate::metrics::BridgeIndexerMetrics; +use crate::postgres_manager::{update_sui_progress_store, write, PgPool}; use crate::{BridgeDataSource, TokenTransfer, TokenTransferData, TokenTransferStatus}; use anyhow::Result; use futures::StreamExt; @@ -10,9 +13,7 @@ use sui_bridge::events::{ MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed, }; -use sui_json_rpc_types::{ - SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse, -}; +use sui_json_rpc_types::{SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse}; use sui_types::BRIDGE_ADDRESS; use tracing::{error, info}; @@ -21,7 +22,10 @@ pub(crate) const COMMIT_BATCH_SIZE: usize = 10; pub async fn handle_sui_transcations_loop( pg_pool: PgPool, - rx: mysten_metrics::metered_channel::Receiver<(Vec, Option)>, + rx: mysten_metrics::metered_channel::Receiver<( + Vec, + Option, + )>, metrics: BridgeIndexerMetrics, ) { let checkpoint_commit_batch_size = std::env::var("COMMIT_BATCH_SIZE") @@ -32,23 +36,28 @@ pub async fn handle_sui_transcations_loop( .ready_chunks(checkpoint_commit_batch_size); while let Some(batch) = stream.next().await { // unwrap: batch must not be empty - let cursor = batch.last().unwrap().1.clone(); - let token_transfers = batch.into_iter().map( + let cursor = batch.last().unwrap().1; + let token_transfers = batch + .into_iter() // TODO: letting it panic so we can capture errors, but we should handle this more gracefully - |(chunk, _)| process_transctions(chunk, &metrics).unwrap() - ).flatten().collect::>(); - // for (chunk, _) in batch { - // let token_transfers = process_transctions(resp, &metrics).unwrap(); - if !token_transfers.is_empty() { - while let Err(err) = write(&pg_pool, token_transfers.clone()) { - error!("Failed to write sui transactions to DB: {:?}", err); - tokio::time::sleep(Duration::from_secs(5)).await; - } - info!("Wrote {} token transfers to DB", token_transfers.len()); + .flat_map(|(chunk, _)| process_transctions(chunk, &metrics).unwrap()) + .collect::>(); + + // write batched token transfers to DB + if !token_transfers.is_empty() { + // unwrap: token_transfers is not empty + let last_ckp = token_transfers.last().as_ref().unwrap().block_height; + while let Err(err) = write(&pg_pool, token_transfers.clone()) { + error!("Failed to write sui transactions to DB: {:?}", err); + tokio::time::sleep(Duration::from_secs(5)).await; } - // } + info!("Wrote {} token transfers to DB", token_transfers.len()); + metrics.last_committed_sui_checkpoint.set(last_ckp as i64); + } + + // update sui progress store using the latest cursor if let Some(cursor) = cursor { - while let Err(err) = update_sui_progress_store(&pg_pool, cursor.clone()) { + while let Err(err) = update_sui_progress_store(&pg_pool, cursor) { error!("Failed to update sui progress tore DB: {:?}", err); tokio::time::sleep(Duration::from_secs(5)).await; } diff --git a/crates/sui-bridge-indexer/src/sui_transaction_queries.rs b/crates/sui-bridge-indexer/src/sui_transaction_queries.rs index 268e59baee5a3..ae046066abb55 100644 --- a/crates/sui-bridge-indexer/src/sui_transaction_queries.rs +++ b/crates/sui-bridge-indexer/src/sui_transaction_queries.rs @@ -1,15 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::sync::Arc; use std::time::Duration; use sui_json_rpc_types::SuiTransactionBlockResponseOptions; use sui_json_rpc_types::TransactionFilter; use sui_json_rpc_types::{SuiTransactionBlockResponse, SuiTransactionBlockResponseQuery}; -use sui_sdk::{SuiClient, SuiClientBuilder}; +use sui_sdk::SuiClient; use sui_types::digests::TransactionDigest; use sui_types::SUI_BRIDGE_OBJECT_ID; -use sui_bridge::retry_with_max_elapsed_time; +use sui_bridge::{metrics::BridgeMetrics, retry_with_max_elapsed_time}; use tracing::{error, info}; const QUERY_DURATION: Duration = Duration::from_millis(500); @@ -17,7 +18,11 @@ const QUERY_DURATION: Duration = Duration::from_millis(500); pub async fn start_sui_tx_polling_task( sui_client: SuiClient, mut cursor: Option, - tx: mysten_metrics::metered_channel::Sender<(Vec, Option)>, + tx: mysten_metrics::metered_channel::Sender<( + Vec, + Option, + )>, + metrics: Arc, ) { info!("Starting SUI transaction polling task from {:?}", cursor); loop { @@ -37,9 +42,13 @@ pub async fn start_sui_tx_polling_task( continue; }; info!("Retrieved {} bridge transactions", results.data.len()); + let ckp_option = results.data.last().as_ref().map(|r| r.checkpoint); tx.send((results.data, results.next_cursor)) .await .expect("Failed to send transaction block to process"); + if let Some(Some(ckp)) = ckp_option { + metrics.last_synced_sui_checkpoint.set(ckp as i64); + } cursor = results.next_cursor; // tokio::time::sleep(QUERY_DURATION).await; } diff --git a/crates/sui-bridge-indexer/src/sui_worker.rs b/crates/sui-bridge-indexer/src/sui_worker.rs index 18bee9e600301..50d7fe71c4ca3 100644 --- a/crates/sui-bridge-indexer/src/sui_worker.rs +++ b/crates/sui-bridge-indexer/src/sui_worker.rs @@ -21,6 +21,7 @@ use sui_types::{ transaction::{TransactionDataAPI, TransactionKind}, BRIDGE_ADDRESS, SUI_BRIDGE_OBJECT_ID, }; +use tap::tap::TapFallible; use tracing::info; pub struct SuiBridgeWorker { @@ -188,6 +189,11 @@ impl Worker for SuiBridgeWorker { Ok::<_, anyhow::Error>(result) })?; - write(&self.pg_pool, bridge_data) + write(&self.pg_pool, bridge_data).tap_ok(|_| { + info!("Processed checkpoint [{}] successfully", checkpoint_num,); + self.metrics + .last_committed_sui_checkpoint + .set(checkpoint_num as i64); + }) } } diff --git a/crates/sui-bridge/src/eth_syncer.rs b/crates/sui-bridge/src/eth_syncer.rs index dc73e05809ddd..8767d50828ad3 100644 --- a/crates/sui-bridge/src/eth_syncer.rs +++ b/crates/sui-bridge/src/eth_syncer.rs @@ -8,6 +8,7 @@ use crate::error::BridgeResult; use crate::eth_client::EthClient; +use crate::metrics::BridgeMetrics; use crate::retry_with_max_elapsed_time; use crate::types::EthLog; use ethers::types::Address as EthAddress; @@ -45,6 +46,7 @@ where pub async fn run( self, + metrics: Arc, ) -> BridgeResult<( Vec>, mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec)>, @@ -62,13 +64,19 @@ where watch::channel(last_finalized_block); let mut task_handles = vec![]; let eth_client_clone = self.eth_client.clone(); + let metrics_clone = metrics.clone(); task_handles.push(spawn_logged_monitored_task!( - Self::run_finalized_block_refresh_task(last_finalized_block_tx, eth_client_clone) + Self::run_finalized_block_refresh_task( + last_finalized_block_tx, + eth_client_clone, + metrics_clone + ) )); for (contract_address, start_block) in self.contract_addresses { let eth_evnets_tx_clone = eth_evnets_tx.clone(); let last_finalized_block_rx_clone = last_finalized_block_rx.clone(); let eth_client_clone = self.eth_client.clone(); + let metrics_clone = metrics.clone(); task_handles.push(spawn_logged_monitored_task!( Self::run_event_listening_task( contract_address, @@ -76,6 +84,7 @@ where last_finalized_block_rx_clone, eth_evnets_tx_clone, eth_client_clone, + metrics_clone, ) )); } @@ -85,6 +94,7 @@ where async fn run_finalized_block_refresh_task( last_finalized_block_sender: watch::Sender, eth_client: Arc>, + metrics: Arc, ) { tracing::info!("Starting finalized block refresh task."); let mut last_block_number = 0; @@ -101,6 +111,7 @@ where continue; }; tracing::debug!("Last finalized block: {}", new_value); + metrics.last_finalized_eth_block.set(new_value as i64); // TODO add a metrics for the last finalized block @@ -122,6 +133,7 @@ where mut last_finalized_block_receiver: watch::Receiver, events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec)>, eth_client: Arc>, + metrics: Arc, ) { tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}"); let mut more_blocks = false; @@ -157,6 +169,7 @@ where continue; }; let len = events.len(); + let last_block = events.last().map(|e| e.block_number); // Note 1: we always events to the channel even when it is empty. This is because of // how `eth_getLogs` api is designed - we want cursor to move forward continuously. @@ -176,6 +189,9 @@ where "Observed {len} new Eth events", ); } + if let Some(last_block) = last_block { + metrics.last_synced_eth_block.set(last_block as i64); + } start_block = end_block + 1; } } @@ -234,7 +250,7 @@ mod tests { ); let (_handles, mut logs_rx, mut finalized_block_rx) = EthSyncer::new(Arc::new(client), addresses) - .run() + .run(Arc::new(BridgeMetrics::new_for_testing())) .await .unwrap(); @@ -324,7 +340,7 @@ mod tests { let (_handles, mut logs_rx, mut finalized_block_rx) = EthSyncer::new(Arc::new(client), addresses) - .run() + .run(Arc::new(BridgeMetrics::new_for_testing())) .await .unwrap(); @@ -461,7 +477,7 @@ mod tests { let (_handles, mut logs_rx, mut finalized_block_rx) = EthSyncer::new(Arc::new(client), addresses) - .run() + .run(Arc::new(BridgeMetrics::new_for_testing())) .await .unwrap(); diff --git a/crates/sui-bridge/src/metrics.rs b/crates/sui-bridge/src/metrics.rs index 2765b57d026e4..9f4a6d103d0cc 100644 --- a/crates/sui-bridge/src/metrics.rs +++ b/crates/sui-bridge/src/metrics.rs @@ -19,6 +19,10 @@ pub struct BridgeMetrics { pub(crate) err_requests: IntCounterVec, pub(crate) requests_inflight: IntGaugeVec, + pub last_synced_sui_checkpoint: IntGauge, + pub(crate) last_finalized_eth_block: IntGauge, + pub(crate) last_synced_eth_block: IntGauge, + pub(crate) sui_watcher_received_events: IntCounter, pub(crate) sui_watcher_received_actions: IntCounter, pub(crate) sui_watcher_unrecognized_events: IntCounter, @@ -31,7 +35,6 @@ pub struct BridgeMetrics { pub(crate) action_executor_execution_queue_received_actions: IntCounter, pub(crate) eth_provider_queries: IntCounter, - pub(crate) gas_coin_balance: IntGauge, } @@ -168,6 +171,24 @@ impl BridgeMetrics { registry, ) .unwrap(), + last_synced_sui_checkpoint: register_int_gauge_with_registry!( + "last_synced_sui_checkpoint", + "The latest sui checkpoint that indexer synced", + registry, + ) + .unwrap(), + last_synced_eth_block: register_int_gauge_with_registry!( + "bridge_last_synced_eth_block", + "The latest finalized eth block that indexer synced", + registry, + ) + .unwrap(), + last_finalized_eth_block: register_int_gauge_with_registry!( + "bridge_last_finalized_eth_block", + "The latest finalized eth block that indexer observed", + registry, + ) + .unwrap(), } } diff --git a/crates/sui-bridge/src/node.rs b/crates/sui-bridge/src/node.rs index 3ce44ad64c11e..fded514880d77 100644 --- a/crates/sui-bridge/src/node.rs +++ b/crates/sui-bridge/src/node.rs @@ -85,7 +85,7 @@ async fn start_client_components( let mut all_handles = vec![]; let (task_handles, eth_events_rx, _) = EthSyncer::new(client_config.eth_client.clone(), eth_contracts_to_watch) - .run() + .run(metrics.clone()) .await .expect("Failed to start eth syncer"); all_handles.extend(task_handles); From 6839973b8b5d98db42d4ef7126ce537c1f8f6c55 Mon Sep 17 00:00:00 2001 From: longbowlu Date: Tue, 25 Jun 2024 22:14:53 -0700 Subject: [PATCH 3/3] handle empty checkpoints --- .../src/latest_eth_syncer.rs | 11 ++++- crates/sui-bridge-indexer/src/lib.rs | 1 + .../src/sui_transaction_handler.rs | 36 ++++++---------- .../src/sui_transaction_queries.rs | 42 +++++++++++++++---- crates/sui-bridge-indexer/src/types.rs | 36 ++++++++++++++++ crates/sui-bridge/src/eth_syncer.rs | 10 ++++- 6 files changed, 101 insertions(+), 35 deletions(-) create mode 100644 crates/sui-bridge-indexer/src/types.rs diff --git a/crates/sui-bridge-indexer/src/latest_eth_syncer.rs b/crates/sui-bridge-indexer/src/latest_eth_syncer.rs index 1cd17494ad8bc..f48fd716ce1c0 100644 --- a/crates/sui-bridge-indexer/src/latest_eth_syncer.rs +++ b/crates/sui-bridge-indexer/src/latest_eth_syncer.rs @@ -11,13 +11,14 @@ use ethers::types::Address as EthAddress; use mysten_metrics::spawn_logged_monitored_task; use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; use sui_bridge::error::BridgeResult; use sui_bridge::eth_client::EthClient; use sui_bridge::retry_with_max_elapsed_time; use sui_bridge::types::EthLog; use tokio::task::JoinHandle; use tokio::time::{self, Duration}; -use tracing::error; +use tracing::{error, info}; use crate::metrics::BridgeIndexerMetrics; @@ -121,6 +122,7 @@ where // Each query does at most ETH_LOG_QUERY_MAX_BLOCK_RANGE blocks. let end_block = std::cmp::min(start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1, new_block); + let timer = Instant::now(); let Ok(Ok(events)) = retry_with_max_elapsed_time!( eth_client.get_events_in_range(contract_address, start_block, end_block), Duration::from_secs(30) @@ -128,6 +130,13 @@ where error!("Failed to get events from eth client after retry"); continue; }; + info!( + ?contract_address, + start_block, + end_block, + "Querying eth events took {:?}", + timer.elapsed() + ); let len = events.len(); let last_block = events.last().map(|e| e.block_number); diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index 8fa9a7585d895..7bbe827374740 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -15,6 +15,7 @@ pub mod schema; pub mod sui_transaction_handler; pub mod sui_transaction_queries; pub mod sui_worker; +pub mod types; #[derive(Clone)] pub struct TokenTransfer { diff --git a/crates/sui-bridge-indexer/src/sui_transaction_handler.rs b/crates/sui-bridge-indexer/src/sui_transaction_handler.rs index 4adb248c0f093..8e852a09c2b8c 100644 --- a/crates/sui-bridge-indexer/src/sui_transaction_handler.rs +++ b/crates/sui-bridge-indexer/src/sui_transaction_handler.rs @@ -3,6 +3,7 @@ use crate::metrics::BridgeIndexerMetrics; use crate::postgres_manager::{update_sui_progress_store, write, PgPool}; +use crate::types::RetrievedTransaction; use crate::{BridgeDataSource, TokenTransfer, TokenTransferData, TokenTransferStatus}; use anyhow::Result; use futures::StreamExt; @@ -13,7 +14,7 @@ use sui_bridge::events::{ MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed, }; -use sui_json_rpc_types::{SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse}; +use sui_json_rpc_types::SuiTransactionBlockEffectsAPI; use sui_types::BRIDGE_ADDRESS; use tracing::{error, info}; @@ -23,7 +24,7 @@ pub(crate) const COMMIT_BATCH_SIZE: usize = 10; pub async fn handle_sui_transcations_loop( pg_pool: PgPool, rx: mysten_metrics::metered_channel::Receiver<( - Vec, + Vec, Option, )>, metrics: BridgeIndexerMetrics, @@ -68,38 +69,25 @@ pub async fn handle_sui_transcations_loop( } fn process_transctions( - resp: Vec, + txes: Vec, metrics: &BridgeIndexerMetrics, ) -> Result> { - resp.into_iter() - .map(|r| into_token_transfers(r, metrics)) + txes.into_iter() + .map(|tx| into_token_transfers(tx, metrics)) .collect::>>() .map(|v| v.into_iter().flatten().collect()) } pub fn into_token_transfers( - resp: SuiTransactionBlockResponse, + tx: RetrievedTransaction, metrics: &BridgeIndexerMetrics, ) -> Result> { let mut transfers = Vec::new(); - let tx_digest = resp.digest; - let events = resp.events.ok_or(anyhow::anyhow!( - "Expected events in SuiTransactionBlockResponse: {:?}", - tx_digest - ))?; - let checkpoint_num = resp.checkpoint.ok_or(anyhow::anyhow!( - "Expected checkpoint in SuiTransactionBlockResponse: {:?}", - tx_digest - ))?; - let timestamp_ms = resp.timestamp_ms.ok_or(anyhow::anyhow!( - "Expected timestamp_ms in SuiTransactionBlockResponse: {:?}", - tx_digest - ))?; - let effects = resp.effects.ok_or(anyhow::anyhow!( - "Expected effects in SuiTransactionBlockResponse: {:?}", - tx_digest - ))?; - for ev in events.data { + let tx_digest = tx.tx_digest; + let timestamp_ms = tx.timestamp_ms; + let checkpoint_num = tx.checkpoint; + let effects = tx.effects; + for ev in tx.events.data { if ev.type_.address != BRIDGE_ADDRESS { continue; } diff --git a/crates/sui-bridge-indexer/src/sui_transaction_queries.rs b/crates/sui-bridge-indexer/src/sui_transaction_queries.rs index ae046066abb55..beb28eaaa6709 100644 --- a/crates/sui-bridge-indexer/src/sui_transaction_queries.rs +++ b/crates/sui-bridge-indexer/src/sui_transaction_queries.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use std::time::Duration; use sui_json_rpc_types::SuiTransactionBlockResponseOptions; +use sui_json_rpc_types::SuiTransactionBlockResponseQuery; use sui_json_rpc_types::TransactionFilter; -use sui_json_rpc_types::{SuiTransactionBlockResponse, SuiTransactionBlockResponseQuery}; use sui_sdk::SuiClient; use sui_types::digests::TransactionDigest; use sui_types::SUI_BRIDGE_OBJECT_ID; @@ -13,13 +13,16 @@ use sui_types::SUI_BRIDGE_OBJECT_ID; use sui_bridge::{metrics::BridgeMetrics, retry_with_max_elapsed_time}; use tracing::{error, info}; -const QUERY_DURATION: Duration = Duration::from_millis(500); +use crate::types::RetrievedTransaction; + +const QUERY_DURATION: Duration = Duration::from_secs(1); +const SLEEP_DURATION: Duration = Duration::from_secs(5); pub async fn start_sui_tx_polling_task( sui_client: SuiClient, mut cursor: Option, tx: mysten_metrics::metered_channel::Sender<( - Vec, + Vec, Option, )>, metrics: Arc, @@ -42,14 +45,35 @@ pub async fn start_sui_tx_polling_task( continue; }; info!("Retrieved {} bridge transactions", results.data.len()); - let ckp_option = results.data.last().as_ref().map(|r| r.checkpoint); - tx.send((results.data, results.next_cursor)) + let txes = match results + .data + .into_iter() + .map(RetrievedTransaction::try_from) + .collect::>>() + { + Ok(data) => data, + Err(e) => { + // TOOD: Sometimes fullnode does not return checkpoint strangely. We retry instead of + // panicking. + error!( + "Failed to convert retrieved transactions to sanitized format: {}", + e + ); + tokio::time::sleep(SLEEP_DURATION).await; + continue; + } + }; + if txes.is_empty() { + // When there is no more new data, we are caught up, no need to stress the fullnode + tokio::time::sleep(QUERY_DURATION).await; + continue; + } + // Unwrap: txes is not empty + let ckp = txes.last().unwrap().checkpoint; + tx.send((txes, results.next_cursor)) .await .expect("Failed to send transaction block to process"); - if let Some(Some(ckp)) = ckp_option { - metrics.last_synced_sui_checkpoint.set(ckp as i64); - } + metrics.last_synced_sui_checkpoint.set(ckp as i64); cursor = results.next_cursor; - // tokio::time::sleep(QUERY_DURATION).await; } } diff --git a/crates/sui-bridge-indexer/src/types.rs b/crates/sui-bridge-indexer/src/types.rs new file mode 100644 index 0000000000000..d6ea0331d1919 --- /dev/null +++ b/crates/sui-bridge-indexer/src/types.rs @@ -0,0 +1,36 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use sui_json_rpc_types::{ + SuiTransactionBlockEffects, SuiTransactionBlockEvents, SuiTransactionBlockResponse, +}; +use sui_types::digests::TransactionDigest; + +pub struct RetrievedTransaction { + pub tx_digest: TransactionDigest, + pub events: SuiTransactionBlockEvents, + pub checkpoint: u64, + pub timestamp_ms: u64, + pub effects: SuiTransactionBlockEffects, +} + +impl TryFrom for RetrievedTransaction { + type Error = anyhow::Error; + fn try_from(response: SuiTransactionBlockResponse) -> Result { + Ok(RetrievedTransaction { + tx_digest: response.digest, + events: response + .events + .ok_or(anyhow::anyhow!("missing events in responses"))?, + checkpoint: response + .checkpoint + .ok_or(anyhow::anyhow!("missing checkpoint in responses"))?, + timestamp_ms: response + .timestamp_ms + .ok_or(anyhow::anyhow!("missing timestamp_ms in responses"))?, + effects: response + .effects + .ok_or(anyhow::anyhow!("missing effects in responses"))?, + }) + } +} diff --git a/crates/sui-bridge/src/eth_syncer.rs b/crates/sui-bridge/src/eth_syncer.rs index 8767d50828ad3..e52461fbdbc76 100644 --- a/crates/sui-bridge/src/eth_syncer.rs +++ b/crates/sui-bridge/src/eth_syncer.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::watch; use tokio::task::JoinHandle; -use tokio::time::{self, Duration}; +use tokio::time::{self, Duration, Instant}; use tracing::error; const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000; @@ -161,6 +161,7 @@ where new_finalized_block, ); more_blocks = end_block < new_finalized_block; + let timer = Instant::now(); let Ok(Ok(events)) = retry_with_max_elapsed_time!( eth_client.get_events_in_range(contract_address, start_block, end_block), Duration::from_secs(600) @@ -168,6 +169,13 @@ where error!("Failed to get events from eth client after retry"); continue; }; + tracing::info!( + ?contract_address, + start_block, + end_block, + "Querying eth events took {:?}", + timer.elapsed() + ); let len = events.len(); let last_block = events.last().map(|e| e.block_number);