Skip to content

Commit

Permalink
handle empty checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Jun 26, 2024
1 parent 9557fc2 commit 6839973
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 35 deletions.
11 changes: 10 additions & 1 deletion crates/sui-bridge-indexer/src/latest_eth_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use ethers::types::Address as EthAddress;
use mysten_metrics::spawn_logged_monitored_task;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use sui_bridge::error::BridgeResult;
use sui_bridge::eth_client::EthClient;
use sui_bridge::retry_with_max_elapsed_time;
use sui_bridge::types::EthLog;
use tokio::task::JoinHandle;
use tokio::time::{self, Duration};
use tracing::error;
use tracing::{error, info};

use crate::metrics::BridgeIndexerMetrics;

Expand Down Expand Up @@ -121,13 +122,21 @@ where
// Each query does at most ETH_LOG_QUERY_MAX_BLOCK_RANGE blocks.
let end_block =
std::cmp::min(start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1, new_block);
let timer = Instant::now();
let Ok(Ok(events)) = retry_with_max_elapsed_time!(
eth_client.get_events_in_range(contract_address, start_block, end_block),
Duration::from_secs(30)
) else {
error!("Failed to get events from eth client after retry");
continue;
};
info!(
?contract_address,
start_block,
end_block,
"Querying eth events took {:?}",
timer.elapsed()
);
let len = events.len();
let last_block = events.last().map(|e| e.block_number);

Expand Down
1 change: 1 addition & 0 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod schema;
pub mod sui_transaction_handler;
pub mod sui_transaction_queries;
pub mod sui_worker;
pub mod types;

#[derive(Clone)]
pub struct TokenTransfer {
Expand Down
36 changes: 12 additions & 24 deletions crates/sui-bridge-indexer/src/sui_transaction_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::metrics::BridgeIndexerMetrics;
use crate::postgres_manager::{update_sui_progress_store, write, PgPool};
use crate::types::RetrievedTransaction;
use crate::{BridgeDataSource, TokenTransfer, TokenTransferData, TokenTransferStatus};
use anyhow::Result;
use futures::StreamExt;
Expand All @@ -13,7 +14,7 @@ use sui_bridge::events::{
MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed,
};

use sui_json_rpc_types::{SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse};
use sui_json_rpc_types::SuiTransactionBlockEffectsAPI;

use sui_types::BRIDGE_ADDRESS;
use tracing::{error, info};
Expand All @@ -23,7 +24,7 @@ pub(crate) const COMMIT_BATCH_SIZE: usize = 10;
pub async fn handle_sui_transcations_loop(
pg_pool: PgPool,
rx: mysten_metrics::metered_channel::Receiver<(
Vec<SuiTransactionBlockResponse>,
Vec<RetrievedTransaction>,
Option<TransactionDigest>,
)>,
metrics: BridgeIndexerMetrics,
Expand Down Expand Up @@ -68,38 +69,25 @@ pub async fn handle_sui_transcations_loop(
}

fn process_transctions(
resp: Vec<SuiTransactionBlockResponse>,
txes: Vec<RetrievedTransaction>,
metrics: &BridgeIndexerMetrics,
) -> Result<Vec<TokenTransfer>> {
resp.into_iter()
.map(|r| into_token_transfers(r, metrics))
txes.into_iter()
.map(|tx| into_token_transfers(tx, metrics))
.collect::<Result<Vec<_>>>()
.map(|v| v.into_iter().flatten().collect())
}

pub fn into_token_transfers(
resp: SuiTransactionBlockResponse,
tx: RetrievedTransaction,
metrics: &BridgeIndexerMetrics,
) -> Result<Vec<TokenTransfer>> {
let mut transfers = Vec::new();
let tx_digest = resp.digest;
let events = resp.events.ok_or(anyhow::anyhow!(
"Expected events in SuiTransactionBlockResponse: {:?}",
tx_digest
))?;
let checkpoint_num = resp.checkpoint.ok_or(anyhow::anyhow!(
"Expected checkpoint in SuiTransactionBlockResponse: {:?}",
tx_digest
))?;
let timestamp_ms = resp.timestamp_ms.ok_or(anyhow::anyhow!(
"Expected timestamp_ms in SuiTransactionBlockResponse: {:?}",
tx_digest
))?;
let effects = resp.effects.ok_or(anyhow::anyhow!(
"Expected effects in SuiTransactionBlockResponse: {:?}",
tx_digest
))?;
for ev in events.data {
let tx_digest = tx.tx_digest;
let timestamp_ms = tx.timestamp_ms;
let checkpoint_num = tx.checkpoint;
let effects = tx.effects;
for ev in tx.events.data {
if ev.type_.address != BRIDGE_ADDRESS {
continue;
}
Expand Down
42 changes: 33 additions & 9 deletions crates/sui-bridge-indexer/src/sui_transaction_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@
use std::sync::Arc;
use std::time::Duration;
use sui_json_rpc_types::SuiTransactionBlockResponseOptions;
use sui_json_rpc_types::SuiTransactionBlockResponseQuery;
use sui_json_rpc_types::TransactionFilter;
use sui_json_rpc_types::{SuiTransactionBlockResponse, SuiTransactionBlockResponseQuery};
use sui_sdk::SuiClient;
use sui_types::digests::TransactionDigest;
use sui_types::SUI_BRIDGE_OBJECT_ID;

use sui_bridge::{metrics::BridgeMetrics, retry_with_max_elapsed_time};
use tracing::{error, info};

const QUERY_DURATION: Duration = Duration::from_millis(500);
use crate::types::RetrievedTransaction;

const QUERY_DURATION: Duration = Duration::from_secs(1);
const SLEEP_DURATION: Duration = Duration::from_secs(5);

pub async fn start_sui_tx_polling_task(
sui_client: SuiClient,
mut cursor: Option<TransactionDigest>,
tx: mysten_metrics::metered_channel::Sender<(
Vec<SuiTransactionBlockResponse>,
Vec<RetrievedTransaction>,
Option<TransactionDigest>,
)>,
metrics: Arc<BridgeMetrics>,
Expand All @@ -42,14 +45,35 @@ pub async fn start_sui_tx_polling_task(
continue;
};
info!("Retrieved {} bridge transactions", results.data.len());
let ckp_option = results.data.last().as_ref().map(|r| r.checkpoint);
tx.send((results.data, results.next_cursor))
let txes = match results
.data
.into_iter()
.map(RetrievedTransaction::try_from)
.collect::<anyhow::Result<Vec<_>>>()
{
Ok(data) => data,
Err(e) => {
// TOOD: Sometimes fullnode does not return checkpoint strangely. We retry instead of
// panicking.
error!(
"Failed to convert retrieved transactions to sanitized format: {}",
e
);
tokio::time::sleep(SLEEP_DURATION).await;
continue;
}
};
if txes.is_empty() {
// When there is no more new data, we are caught up, no need to stress the fullnode
tokio::time::sleep(QUERY_DURATION).await;
continue;
}
// Unwrap: txes is not empty
let ckp = txes.last().unwrap().checkpoint;
tx.send((txes, results.next_cursor))
.await
.expect("Failed to send transaction block to process");
if let Some(Some(ckp)) = ckp_option {
metrics.last_synced_sui_checkpoint.set(ckp as i64);
}
metrics.last_synced_sui_checkpoint.set(ckp as i64);
cursor = results.next_cursor;
// tokio::time::sleep(QUERY_DURATION).await;
}
}
36 changes: 36 additions & 0 deletions crates/sui-bridge-indexer/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use sui_json_rpc_types::{
SuiTransactionBlockEffects, SuiTransactionBlockEvents, SuiTransactionBlockResponse,
};
use sui_types::digests::TransactionDigest;

pub struct RetrievedTransaction {
pub tx_digest: TransactionDigest,
pub events: SuiTransactionBlockEvents,
pub checkpoint: u64,
pub timestamp_ms: u64,
pub effects: SuiTransactionBlockEffects,
}

impl TryFrom<SuiTransactionBlockResponse> for RetrievedTransaction {
type Error = anyhow::Error;
fn try_from(response: SuiTransactionBlockResponse) -> Result<Self, Self::Error> {
Ok(RetrievedTransaction {
tx_digest: response.digest,
events: response
.events
.ok_or(anyhow::anyhow!("missing events in responses"))?,
checkpoint: response
.checkpoint
.ok_or(anyhow::anyhow!("missing checkpoint in responses"))?,
timestamp_ms: response
.timestamp_ms
.ok_or(anyhow::anyhow!("missing timestamp_ms in responses"))?,
effects: response
.effects
.ok_or(anyhow::anyhow!("missing effects in responses"))?,
})
}
}
10 changes: 9 additions & 1 deletion crates/sui-bridge/src/eth_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::{self, Duration};
use tokio::time::{self, Duration, Instant};
use tracing::error;

const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
Expand Down Expand Up @@ -161,13 +161,21 @@ where
new_finalized_block,
);
more_blocks = end_block < new_finalized_block;
let timer = Instant::now();
let Ok(Ok(events)) = retry_with_max_elapsed_time!(
eth_client.get_events_in_range(contract_address, start_block, end_block),
Duration::from_secs(600)
) else {
error!("Failed to get events from eth client after retry");
continue;
};
tracing::info!(
?contract_address,
start_block,
end_block,
"Querying eth events took {:?}",
timer.elapsed()
);
let len = events.len();
let last_block = events.last().map(|e| e.block_number);

Expand Down

0 comments on commit 6839973

Please sign in to comment.