Skip to content

Commit

Permalink
[bridge indexer] query transactions (MystenLabs#18400)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
longbowlu authored and tx-tomcat committed Jul 29, 2024
1 parent 0347069 commit 3ecaaa6
Show file tree
Hide file tree
Showing 19 changed files with 559 additions and 29 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/sui-bridge-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ edition = "2021"

[dependencies]
serde.workspace = true
tap.workspace = true
diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] }
ethers = "2.0"
tokio = { workspace = true, features = ["full"] }
anyhow.workspace = true
futures.workspace = true
async-trait.workspace = true
bcs.workspace = true
bin-version.workspace = true
Expand All @@ -20,6 +22,8 @@ mysten-metrics.workspace = true
prometheus.workspace = true
serde_yaml.workspace = true
sui-bridge.workspace = true
sui-sdk.workspace = true
sui-json-rpc-types.workspace = true
sui-data-ingestion-core.workspace = true
sui-types.workspace = true
telemetry-subscribers.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/sui-bridge-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct Config {
pub start_block: u64,
pub metric_url: String,
pub metric_port: u16,
pub sui_rpc_url: Option<String>,
}

/// Load the config to run.
Expand Down
18 changes: 15 additions & 3 deletions crates/sui-bridge-indexer/src/eth_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use sui_bridge::abi::{EthBridgeEvent, EthSuiBridgeEvents};
use sui_bridge::metrics::BridgeMetrics;
use sui_bridge::types::EthLog;
use sui_bridge::{eth_client::EthClient, eth_syncer::EthSyncer};
use tokio::task::JoinHandle;
use tracing::info;
use tracing::log::error;

#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct EthBridgeWorker {
provider: Arc<Provider<Http>>,
pg_pool: PgPool,
bridge_metrics: Arc<BridgeMetrics>,
metrics: BridgeIndexerMetrics,
bridge_address: EthAddress,
config: Config,
Expand All @@ -34,6 +36,7 @@ pub struct EthBridgeWorker {
impl EthBridgeWorker {
pub fn new(
pg_pool: PgPool,
bridge_metrics: Arc<BridgeMetrics>,
metrics: BridgeIndexerMetrics,
config: Config,
) -> Result<Self, Box<dyn std::error::Error>> {
Expand All @@ -47,6 +50,7 @@ impl EthBridgeWorker {
Ok(Self {
provider,
pg_pool,
bridge_metrics,
metrics,
bridge_address,
config,
Expand All @@ -69,7 +73,7 @@ impl EthBridgeWorker {

let (_task_handles, eth_events_rx, _) =
EthSyncer::new(eth_client, finalized_contract_addresses)
.run()
.run(self.bridge_metrics.clone())
.await
.map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?;

Expand Down Expand Up @@ -114,7 +118,7 @@ impl EthBridgeWorker {
self.provider.clone(),
unfinalized_contract_addresses.clone(),
)
.run()
.run(self.metrics.clone())
.await
.map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?;

Expand Down Expand Up @@ -146,6 +150,11 @@ async fn process_eth_events(
mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
finalized: bool,
) {
let progress_gauge = if finalized {
metrics.last_committed_eth_block.clone()
} else {
metrics.last_committed_unfinalized_eth_block.clone()
};
while let Some((_, _, logs)) = eth_events_rx.recv().await {
for log in logs.iter() {
let eth_bridge_event = EthBridgeEvent::try_from_eth_log(log);
Expand Down Expand Up @@ -240,8 +249,11 @@ async fn process_eth_events(
}
};

// TODO: we either scream here or keep retrying this until we succeed
if let Err(e) = write(&pg_pool, vec![transfer]) {
error!("Error writing token transfer to database: {:?}", e);
} else {
progress_gauge.set(block_number as i64);
}
}
}
Expand Down
24 changes: 22 additions & 2 deletions crates/sui-bridge-indexer/src/latest_eth_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ use ethers::types::Address as EthAddress;
use mysten_metrics::spawn_logged_monitored_task;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use sui_bridge::error::BridgeResult;
use sui_bridge::eth_client::EthClient;
use sui_bridge::retry_with_max_elapsed_time;
use sui_bridge::types::EthLog;
use tokio::task::JoinHandle;
use tokio::time::{self, Duration};
use tracing::error;
use tracing::{error, info};

use crate::metrics::BridgeIndexerMetrics;

const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
Expand Down Expand Up @@ -51,6 +54,7 @@ where

pub async fn run(
self,
metrics: BridgeIndexerMetrics,
) -> BridgeResult<(
Vec<JoinHandle<()>>,
mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
Expand All @@ -66,16 +70,17 @@ where
let mut task_handles = vec![];
for (contract_address, start_block) in self.contract_addresses {
let eth_events_tx_clone = eth_evnets_tx.clone();
// let latest_block_rx_clone = latest_block_rx.clone();
let eth_client_clone = self.eth_client.clone();
let provider_clone = self.provider.clone();
let metrics_clone = metrics.clone();
task_handles.push(spawn_logged_monitored_task!(
Self::run_event_listening_task(
contract_address,
start_block,
provider_clone,
eth_events_tx_clone,
eth_client_clone,
metrics_clone,
)
));
}
Expand All @@ -88,6 +93,7 @@ where
provider: Arc<Provider<Http>>,
events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
eth_client: Arc<EthClient<P>>,
metrics: BridgeIndexerMetrics,
) {
tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
loop {
Expand Down Expand Up @@ -116,14 +122,23 @@ where
// Each query does at most ETH_LOG_QUERY_MAX_BLOCK_RANGE blocks.
let end_block =
std::cmp::min(start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1, new_block);
let timer = Instant::now();
let Ok(Ok(events)) = retry_with_max_elapsed_time!(
eth_client.get_events_in_range(contract_address, start_block, end_block),
Duration::from_secs(30)
) else {
error!("Failed to get events from eth client after retry");
continue;
};
info!(
?contract_address,
start_block,
end_block,
"Querying eth events took {:?}",
timer.elapsed()
);
let len = events.len();
let last_block = events.last().map(|e| e.block_number);

// Note 1: we always events to the channel even when it is empty. This is because of
// how `eth_getLogs` api is designed - we want cursor to move forward continuously.
Expand All @@ -143,6 +158,11 @@ where
"Observed {len} new Eth events",
);
}
if let Some(last_block) = last_block {
metrics
.last_synced_unfinalized_eth_block
.set(last_block as i64);
}
start_block = end_block + 1;
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ pub mod metrics;
pub mod models;
pub mod postgres_manager;
pub mod schema;
pub mod sui_transaction_handler;
pub mod sui_transaction_queries;
pub mod sui_worker;
pub mod types;

#[derive(Clone)]
pub struct TokenTransfer {
chain_id: u8,
nonce: u64,
Expand All @@ -27,6 +31,7 @@ pub struct TokenTransfer {
data: Option<TokenTransferData>,
}

#[derive(Clone)]
pub struct TokenTransferData {
sender_address: Vec<u8>,
destination_chain: u8,
Expand Down Expand Up @@ -66,6 +71,7 @@ impl TokenTransfer {
}
}

#[derive(Clone)]
pub(crate) enum TokenTransferStatus {
DepositedUnfinalized,
Deposited,
Expand All @@ -85,6 +91,7 @@ impl Display for TokenTransferStatus {
}
}

#[derive(Clone)]
enum BridgeDataSource {
Sui,
Eth,
Expand Down
82 changes: 69 additions & 13 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use anyhow::Result;
use clap::*;
use mysten_metrics::spawn_logged_monitored_task;
use mysten_metrics::start_prometheus_server;
use prometheus::Registry;
use std::collections::{HashMap, HashSet};
Expand All @@ -12,11 +13,17 @@ use std::sync::Arc;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metrics::BridgeMetrics;
use sui_bridge_indexer::eth_worker::EthBridgeWorker;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, PgProgressStore};
use sui_bridge_indexer::postgres_manager::{
get_connection_pool, read_sui_progress_store, PgProgressStore,
};
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transcations_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
use sui_bridge_indexer::sui_worker::SuiBridgeWorker;
use sui_bridge_indexer::{config::load_config, metrics::BridgeIndexerMetrics};
use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool};
use sui_sdk::SuiClientBuilder;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tokio::task::JoinHandle;

use tokio::sync::oneshot;
use tracing::info;
Expand Down Expand Up @@ -71,6 +78,7 @@ async fn main() -> Result<()> {
// TODO: retry_with_max_elapsed_time
let eth_worker = EthBridgeWorker::new(
get_connection_pool(db_url.clone()),
bridge_metrics.clone(),
indexer_meterics.clone(),
config.clone(),
)
Expand All @@ -80,24 +88,43 @@ async fn main() -> Result<()> {
EthClient::<ethers::providers::Http>::new(
&config.eth_rpc_url,
HashSet::from_iter(vec![eth_worker.bridge_address()]),
bridge_metrics,
bridge_metrics.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?,
);

let unfinalized_handle = eth_worker.start_indexing_unfinalized_events(eth_client.clone());
let finalized_handle = eth_worker.start_indexing_finalized_events(eth_client.clone());

// TODO: add retry_with_max_elapsed_time
let progress = start_processing_sui_checkpoints(
&config_clone,
db_url,
indexer_meterics,
ingestion_metrics,
);
let unfinalized_handle = eth_worker
.start_indexing_unfinalized_events(eth_client.clone())
.await
.unwrap();
let finalized_handle = eth_worker
.start_indexing_finalized_events(eth_client.clone())
.await
.unwrap();
let handles = vec![unfinalized_handle, finalized_handle];

if let Some(sui_rpc_url) = config.sui_rpc_url.clone() {
start_processing_sui_checkpoints_by_querying_txes(
sui_rpc_url,
db_url.clone(),
indexer_meterics.clone(),
bridge_metrics,
)
.await
.unwrap();
} else {
let _ = start_processing_sui_checkpoints(
&config_clone,
db_url,
indexer_meterics,
ingestion_metrics,
)
.await;
}

let _ = tokio::try_join!(finalized_handle, unfinalized_handle, progress);
// We are not waiting for the sui tasks to finish here, which is ok.
let _ = futures::future::join_all(handles).await;

Ok(())
}
Expand Down Expand Up @@ -137,3 +164,32 @@ async fn start_processing_sui_checkpoints(
)
.await
}

async fn start_processing_sui_checkpoints_by_querying_txes(
sui_rpc_url: String,
db_url: String,
indexer_metrics: BridgeIndexerMetrics,
bridge_metrics: Arc<BridgeMetrics>,
) -> Result<Vec<JoinHandle<()>>> {
let pg_pool = get_connection_pool(db_url.clone());
let (tx, rx) = mysten_metrics::metered_channel::channel(
100,
&mysten_metrics::get_metrics()
.unwrap()
.channel_inflight
.with_label_values(&["sui_transaction_processing_queue"]),
);
let mut handles = vec![];
let cursor =
read_sui_progress_store(&pg_pool).expect("Failed to read cursor from sui progress store");
let sui_client = SuiClientBuilder::default().build(sui_rpc_url).await?;
handles.push(spawn_logged_monitored_task!(
start_sui_tx_polling_task(sui_client, cursor, tx, bridge_metrics),
"start_sui_tx_polling_task"
));
handles.push(spawn_logged_monitored_task!(
handle_sui_transcations_loop(pg_pool.clone(), rx, indexer_metrics.clone()),
"handle_sui_transcations_loop"
));
Ok(handles)
}
Loading

0 comments on commit 3ecaaa6

Please sign in to comment.