diff --git a/crates/sui-bridge-indexer/src/latest_eth_syncer.rs b/crates/sui-bridge-indexer/src/latest_eth_syncer.rs index 1cd17494ad8bc..f48fd716ce1c0 100644 --- a/crates/sui-bridge-indexer/src/latest_eth_syncer.rs +++ b/crates/sui-bridge-indexer/src/latest_eth_syncer.rs @@ -11,13 +11,14 @@ use ethers::types::Address as EthAddress; use mysten_metrics::spawn_logged_monitored_task; use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; use sui_bridge::error::BridgeResult; use sui_bridge::eth_client::EthClient; use sui_bridge::retry_with_max_elapsed_time; use sui_bridge::types::EthLog; use tokio::task::JoinHandle; use tokio::time::{self, Duration}; -use tracing::error; +use tracing::{error, info}; use crate::metrics::BridgeIndexerMetrics; @@ -121,6 +122,7 @@ where // Each query does at most ETH_LOG_QUERY_MAX_BLOCK_RANGE blocks. let end_block = std::cmp::min(start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1, new_block); + let timer = Instant::now(); let Ok(Ok(events)) = retry_with_max_elapsed_time!( eth_client.get_events_in_range(contract_address, start_block, end_block), Duration::from_secs(30) @@ -128,6 +130,13 @@ where error!("Failed to get events from eth client after retry"); continue; }; + info!( + ?contract_address, + start_block, + end_block, + "Querying eth events took {:?}", + timer.elapsed() + ); let len = events.len(); let last_block = events.last().map(|e| e.block_number); diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index 8fa9a7585d895..7bbe827374740 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -15,6 +15,7 @@ pub mod schema; pub mod sui_transaction_handler; pub mod sui_transaction_queries; pub mod sui_worker; +pub mod types; #[derive(Clone)] pub struct TokenTransfer { diff --git a/crates/sui-bridge-indexer/src/sui_transaction_handler.rs b/crates/sui-bridge-indexer/src/sui_transaction_handler.rs index 4adb248c0f093..8e852a09c2b8c 100644 --- a/crates/sui-bridge-indexer/src/sui_transaction_handler.rs +++ b/crates/sui-bridge-indexer/src/sui_transaction_handler.rs @@ -3,6 +3,7 @@ use crate::metrics::BridgeIndexerMetrics; use crate::postgres_manager::{update_sui_progress_store, write, PgPool}; +use crate::types::RetrievedTransaction; use crate::{BridgeDataSource, TokenTransfer, TokenTransferData, TokenTransferStatus}; use anyhow::Result; use futures::StreamExt; @@ -13,7 +14,7 @@ use sui_bridge::events::{ MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed, }; -use sui_json_rpc_types::{SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse}; +use sui_json_rpc_types::SuiTransactionBlockEffectsAPI; use sui_types::BRIDGE_ADDRESS; use tracing::{error, info}; @@ -23,7 +24,7 @@ pub(crate) const COMMIT_BATCH_SIZE: usize = 10; pub async fn handle_sui_transcations_loop( pg_pool: PgPool, rx: mysten_metrics::metered_channel::Receiver<( - Vec, + Vec, Option, )>, metrics: BridgeIndexerMetrics, @@ -68,38 +69,25 @@ pub async fn handle_sui_transcations_loop( } fn process_transctions( - resp: Vec, + txes: Vec, metrics: &BridgeIndexerMetrics, ) -> Result> { - resp.into_iter() - .map(|r| into_token_transfers(r, metrics)) + txes.into_iter() + .map(|tx| into_token_transfers(tx, metrics)) .collect::>>() .map(|v| v.into_iter().flatten().collect()) } pub fn into_token_transfers( - resp: SuiTransactionBlockResponse, + tx: RetrievedTransaction, metrics: &BridgeIndexerMetrics, ) -> Result> { let mut transfers = Vec::new(); - let tx_digest = resp.digest; - let events = resp.events.ok_or(anyhow::anyhow!( - "Expected events in SuiTransactionBlockResponse: {:?}", - tx_digest - ))?; - let checkpoint_num = resp.checkpoint.ok_or(anyhow::anyhow!( - "Expected checkpoint in SuiTransactionBlockResponse: {:?}", - tx_digest - ))?; - let timestamp_ms = resp.timestamp_ms.ok_or(anyhow::anyhow!( - "Expected timestamp_ms in SuiTransactionBlockResponse: {:?}", - tx_digest - ))?; - let effects = resp.effects.ok_or(anyhow::anyhow!( - "Expected effects in SuiTransactionBlockResponse: {:?}", - tx_digest - ))?; - for ev in events.data { + let tx_digest = tx.tx_digest; + let timestamp_ms = tx.timestamp_ms; + let checkpoint_num = tx.checkpoint; + let effects = tx.effects; + for ev in tx.events.data { if ev.type_.address != BRIDGE_ADDRESS { continue; } diff --git a/crates/sui-bridge-indexer/src/sui_transaction_queries.rs b/crates/sui-bridge-indexer/src/sui_transaction_queries.rs index ae046066abb55..beb28eaaa6709 100644 --- a/crates/sui-bridge-indexer/src/sui_transaction_queries.rs +++ b/crates/sui-bridge-indexer/src/sui_transaction_queries.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use std::time::Duration; use sui_json_rpc_types::SuiTransactionBlockResponseOptions; +use sui_json_rpc_types::SuiTransactionBlockResponseQuery; use sui_json_rpc_types::TransactionFilter; -use sui_json_rpc_types::{SuiTransactionBlockResponse, SuiTransactionBlockResponseQuery}; use sui_sdk::SuiClient; use sui_types::digests::TransactionDigest; use sui_types::SUI_BRIDGE_OBJECT_ID; @@ -13,13 +13,16 @@ use sui_types::SUI_BRIDGE_OBJECT_ID; use sui_bridge::{metrics::BridgeMetrics, retry_with_max_elapsed_time}; use tracing::{error, info}; -const QUERY_DURATION: Duration = Duration::from_millis(500); +use crate::types::RetrievedTransaction; + +const QUERY_DURATION: Duration = Duration::from_secs(1); +const SLEEP_DURATION: Duration = Duration::from_secs(5); pub async fn start_sui_tx_polling_task( sui_client: SuiClient, mut cursor: Option, tx: mysten_metrics::metered_channel::Sender<( - Vec, + Vec, Option, )>, metrics: Arc, @@ -42,14 +45,35 @@ pub async fn start_sui_tx_polling_task( continue; }; info!("Retrieved {} bridge transactions", results.data.len()); - let ckp_option = results.data.last().as_ref().map(|r| r.checkpoint); - tx.send((results.data, results.next_cursor)) + let txes = match results + .data + .into_iter() + .map(RetrievedTransaction::try_from) + .collect::>>() + { + Ok(data) => data, + Err(e) => { + // TOOD: Sometimes fullnode does not return checkpoint strangely. We retry instead of + // panicking. + error!( + "Failed to convert retrieved transactions to sanitized format: {}", + e + ); + tokio::time::sleep(SLEEP_DURATION).await; + continue; + } + }; + if txes.is_empty() { + // When there is no more new data, we are caught up, no need to stress the fullnode + tokio::time::sleep(QUERY_DURATION).await; + continue; + } + // Unwrap: txes is not empty + let ckp = txes.last().unwrap().checkpoint; + tx.send((txes, results.next_cursor)) .await .expect("Failed to send transaction block to process"); - if let Some(Some(ckp)) = ckp_option { - metrics.last_synced_sui_checkpoint.set(ckp as i64); - } + metrics.last_synced_sui_checkpoint.set(ckp as i64); cursor = results.next_cursor; - // tokio::time::sleep(QUERY_DURATION).await; } } diff --git a/crates/sui-bridge-indexer/src/types.rs b/crates/sui-bridge-indexer/src/types.rs new file mode 100644 index 0000000000000..d6ea0331d1919 --- /dev/null +++ b/crates/sui-bridge-indexer/src/types.rs @@ -0,0 +1,36 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use sui_json_rpc_types::{ + SuiTransactionBlockEffects, SuiTransactionBlockEvents, SuiTransactionBlockResponse, +}; +use sui_types::digests::TransactionDigest; + +pub struct RetrievedTransaction { + pub tx_digest: TransactionDigest, + pub events: SuiTransactionBlockEvents, + pub checkpoint: u64, + pub timestamp_ms: u64, + pub effects: SuiTransactionBlockEffects, +} + +impl TryFrom for RetrievedTransaction { + type Error = anyhow::Error; + fn try_from(response: SuiTransactionBlockResponse) -> Result { + Ok(RetrievedTransaction { + tx_digest: response.digest, + events: response + .events + .ok_or(anyhow::anyhow!("missing events in responses"))?, + checkpoint: response + .checkpoint + .ok_or(anyhow::anyhow!("missing checkpoint in responses"))?, + timestamp_ms: response + .timestamp_ms + .ok_or(anyhow::anyhow!("missing timestamp_ms in responses"))?, + effects: response + .effects + .ok_or(anyhow::anyhow!("missing effects in responses"))?, + }) + } +} diff --git a/crates/sui-bridge/src/eth_syncer.rs b/crates/sui-bridge/src/eth_syncer.rs index 8767d50828ad3..e52461fbdbc76 100644 --- a/crates/sui-bridge/src/eth_syncer.rs +++ b/crates/sui-bridge/src/eth_syncer.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::watch; use tokio::task::JoinHandle; -use tokio::time::{self, Duration}; +use tokio::time::{self, Duration, Instant}; use tracing::error; const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000; @@ -161,6 +161,7 @@ where new_finalized_block, ); more_blocks = end_block < new_finalized_block; + let timer = Instant::now(); let Ok(Ok(events)) = retry_with_max_elapsed_time!( eth_client.get_events_in_range(contract_address, start_block, end_block), Duration::from_secs(600) @@ -168,6 +169,13 @@ where error!("Failed to get events from eth client after retry"); continue; }; + tracing::info!( + ?contract_address, + start_block, + end_block, + "Querying eth events took {:?}", + timer.elapsed() + ); let len = events.len(); let last_block = events.last().map(|e| e.block_number);