Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bridge indexer] query transactions #18400

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/sui-bridge-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ edition = "2021"

[dependencies]
serde.workspace = true
tap.workspace = true
diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] }
ethers = "2.0"
tokio = { workspace = true, features = ["full"] }
anyhow.workspace = true
futures.workspace = true
async-trait.workspace = true
bcs.workspace = true
bin-version.workspace = true
Expand All @@ -20,6 +22,8 @@ mysten-metrics.workspace = true
prometheus.workspace = true
serde_yaml.workspace = true
sui-bridge.workspace = true
sui-sdk.workspace = true
sui-json-rpc-types.workspace = true
sui-data-ingestion-core.workspace = true
sui-types.workspace = true
telemetry-subscribers.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/sui-bridge-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct Config {
pub start_block: u64,
pub metric_url: String,
pub metric_port: u16,
pub sui_rpc_url: Option<String>,
}

/// Load the config to run.
Expand Down
18 changes: 15 additions & 3 deletions crates/sui-bridge-indexer/src/eth_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use sui_bridge::abi::{EthBridgeEvent, EthSuiBridgeEvents};
use sui_bridge::metrics::BridgeMetrics;
use sui_bridge::types::EthLog;
use sui_bridge::{eth_client::EthClient, eth_syncer::EthSyncer};
use tokio::task::JoinHandle;
use tracing::info;
use tracing::log::error;

#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct EthBridgeWorker {
provider: Arc<Provider<Http>>,
pg_pool: PgPool,
bridge_metrics: Arc<BridgeMetrics>,
metrics: BridgeIndexerMetrics,
bridge_address: EthAddress,
config: Config,
Expand All @@ -34,6 +36,7 @@ pub struct EthBridgeWorker {
impl EthBridgeWorker {
pub fn new(
pg_pool: PgPool,
bridge_metrics: Arc<BridgeMetrics>,
metrics: BridgeIndexerMetrics,
config: Config,
) -> Result<Self, Box<dyn std::error::Error>> {
Expand All @@ -47,6 +50,7 @@ impl EthBridgeWorker {
Ok(Self {
provider,
pg_pool,
bridge_metrics,
metrics,
bridge_address,
config,
Expand All @@ -69,7 +73,7 @@ impl EthBridgeWorker {

let (_task_handles, eth_events_rx, _) =
EthSyncer::new(eth_client, finalized_contract_addresses)
.run()
.run(self.bridge_metrics.clone())
.await
.map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?;

Expand Down Expand Up @@ -114,7 +118,7 @@ impl EthBridgeWorker {
self.provider.clone(),
unfinalized_contract_addresses.clone(),
)
.run()
.run(self.metrics.clone())
.await
.map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?;

Expand Down Expand Up @@ -146,6 +150,11 @@ async fn process_eth_events(
mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
finalized: bool,
) {
let progress_gauge = if finalized {
metrics.last_committed_eth_block.clone()
} else {
metrics.last_committed_unfinalized_eth_block.clone()
};
while let Some((_, _, logs)) = eth_events_rx.recv().await {
for log in logs.iter() {
let eth_bridge_event = EthBridgeEvent::try_from_eth_log(log);
Expand Down Expand Up @@ -240,8 +249,11 @@ async fn process_eth_events(
}
};

// TODO: we either scream here or keep retrying this until we succeed
if let Err(e) = write(&pg_pool, vec![transfer]) {
error!("Error writing token transfer to database: {:?}", e);
} else {
progress_gauge.set(block_number as i64);
}
}
}
Expand Down
24 changes: 22 additions & 2 deletions crates/sui-bridge-indexer/src/latest_eth_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ use ethers::types::Address as EthAddress;
use mysten_metrics::spawn_logged_monitored_task;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use sui_bridge::error::BridgeResult;
use sui_bridge::eth_client::EthClient;
use sui_bridge::retry_with_max_elapsed_time;
use sui_bridge::types::EthLog;
use tokio::task::JoinHandle;
use tokio::time::{self, Duration};
use tracing::error;
use tracing::{error, info};

use crate::metrics::BridgeIndexerMetrics;

const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
Expand Down Expand Up @@ -51,6 +54,7 @@ where

pub async fn run(
self,
metrics: BridgeIndexerMetrics,
) -> BridgeResult<(
Vec<JoinHandle<()>>,
mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
Expand All @@ -66,16 +70,17 @@ where
let mut task_handles = vec![];
for (contract_address, start_block) in self.contract_addresses {
let eth_events_tx_clone = eth_evnets_tx.clone();
// let latest_block_rx_clone = latest_block_rx.clone();
let eth_client_clone = self.eth_client.clone();
let provider_clone = self.provider.clone();
let metrics_clone = metrics.clone();
task_handles.push(spawn_logged_monitored_task!(
Self::run_event_listening_task(
contract_address,
start_block,
provider_clone,
eth_events_tx_clone,
eth_client_clone,
metrics_clone,
)
));
}
Expand All @@ -88,6 +93,7 @@ where
provider: Arc<Provider<Http>>,
events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
eth_client: Arc<EthClient<P>>,
metrics: BridgeIndexerMetrics,
) {
tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
loop {
Expand Down Expand Up @@ -116,14 +122,23 @@ where
// Each query does at most ETH_LOG_QUERY_MAX_BLOCK_RANGE blocks.
let end_block =
std::cmp::min(start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1, new_block);
let timer = Instant::now();
let Ok(Ok(events)) = retry_with_max_elapsed_time!(
eth_client.get_events_in_range(contract_address, start_block, end_block),
Duration::from_secs(30)
) else {
error!("Failed to get events from eth client after retry");
continue;
};
info!(
?contract_address,
start_block,
end_block,
"Querying eth events took {:?}",
timer.elapsed()
);
let len = events.len();
let last_block = events.last().map(|e| e.block_number);

// Note 1: we always events to the channel even when it is empty. This is because of
// how `eth_getLogs` api is designed - we want cursor to move forward continuously.
Expand All @@ -143,6 +158,11 @@ where
"Observed {len} new Eth events",
);
}
if let Some(last_block) = last_block {
metrics
.last_synced_unfinalized_eth_block
.set(last_block as i64);
}
start_block = end_block + 1;
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ pub mod metrics;
pub mod models;
pub mod postgres_manager;
pub mod schema;
pub mod sui_transaction_handler;
pub mod sui_transaction_queries;
pub mod sui_worker;
pub mod types;

#[derive(Clone)]
pub struct TokenTransfer {
chain_id: u8,
nonce: u64,
Expand All @@ -27,6 +31,7 @@ pub struct TokenTransfer {
data: Option<TokenTransferData>,
}

#[derive(Clone)]
pub struct TokenTransferData {
sender_address: Vec<u8>,
destination_chain: u8,
Expand Down Expand Up @@ -66,6 +71,7 @@ impl TokenTransfer {
}
}

#[derive(Clone)]
pub(crate) enum TokenTransferStatus {
DepositedUnfinalized,
Deposited,
Expand All @@ -85,6 +91,7 @@ impl Display for TokenTransferStatus {
}
}

#[derive(Clone)]
enum BridgeDataSource {
Sui,
Eth,
Expand Down
82 changes: 69 additions & 13 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use anyhow::Result;
use clap::*;
use mysten_metrics::spawn_logged_monitored_task;
use mysten_metrics::start_prometheus_server;
use prometheus::Registry;
use std::collections::{HashMap, HashSet};
Expand All @@ -12,11 +13,17 @@ use std::sync::Arc;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metrics::BridgeMetrics;
use sui_bridge_indexer::eth_worker::EthBridgeWorker;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, PgProgressStore};
use sui_bridge_indexer::postgres_manager::{
get_connection_pool, read_sui_progress_store, PgProgressStore,
};
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transcations_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
use sui_bridge_indexer::sui_worker::SuiBridgeWorker;
use sui_bridge_indexer::{config::load_config, metrics::BridgeIndexerMetrics};
use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool};
use sui_sdk::SuiClientBuilder;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tokio::task::JoinHandle;

use tokio::sync::oneshot;
use tracing::info;
Expand Down Expand Up @@ -71,6 +78,7 @@ async fn main() -> Result<()> {
// TODO: retry_with_max_elapsed_time
let eth_worker = EthBridgeWorker::new(
get_connection_pool(db_url.clone()),
bridge_metrics.clone(),
indexer_meterics.clone(),
config.clone(),
)
Expand All @@ -80,24 +88,43 @@ async fn main() -> Result<()> {
EthClient::<ethers::providers::Http>::new(
&config.eth_rpc_url,
HashSet::from_iter(vec![eth_worker.bridge_address()]),
bridge_metrics,
bridge_metrics.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?,
);

let unfinalized_handle = eth_worker.start_indexing_unfinalized_events(eth_client.clone());
let finalized_handle = eth_worker.start_indexing_finalized_events(eth_client.clone());

// TODO: add retry_with_max_elapsed_time
let progress = start_processing_sui_checkpoints(
&config_clone,
db_url,
indexer_meterics,
ingestion_metrics,
);
let unfinalized_handle = eth_worker
.start_indexing_unfinalized_events(eth_client.clone())
.await
.unwrap();
let finalized_handle = eth_worker
.start_indexing_finalized_events(eth_client.clone())
.await
.unwrap();
let handles = vec![unfinalized_handle, finalized_handle];

if let Some(sui_rpc_url) = config.sui_rpc_url.clone() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to talk about how we plug in a data reader. Let's make sure to talk at our daily later

start_processing_sui_checkpoints_by_querying_txes(
sui_rpc_url,
db_url.clone(),
indexer_meterics.clone(),
bridge_metrics,
)
.await
.unwrap();
} else {
let _ = start_processing_sui_checkpoints(
&config_clone,
db_url,
indexer_meterics,
ingestion_metrics,
)
.await;
}

let _ = tokio::try_join!(finalized_handle, unfinalized_handle, progress);
// We are not waiting for the sui tasks to finish here, which is ok.
let _ = futures::future::join_all(handles).await;

Ok(())
}
Expand Down Expand Up @@ -137,3 +164,32 @@ async fn start_processing_sui_checkpoints(
)
.await
}

async fn start_processing_sui_checkpoints_by_querying_txes(
sui_rpc_url: String,
db_url: String,
indexer_metrics: BridgeIndexerMetrics,
bridge_metrics: Arc<BridgeMetrics>,
) -> Result<Vec<JoinHandle<()>>> {
let pg_pool = get_connection_pool(db_url.clone());
let (tx, rx) = mysten_metrics::metered_channel::channel(
100,
&mysten_metrics::get_metrics()
.unwrap()
.channel_inflight
.with_label_values(&["sui_transaction_processing_queue"]),
);
let mut handles = vec![];
let cursor =
read_sui_progress_store(&pg_pool).expect("Failed to read cursor from sui progress store");
let sui_client = SuiClientBuilder::default().build(sui_rpc_url).await?;
handles.push(spawn_logged_monitored_task!(
start_sui_tx_polling_task(sui_client, cursor, tx, bridge_metrics),
"start_sui_tx_polling_task"
));
handles.push(spawn_logged_monitored_task!(
handle_sui_transcations_loop(pg_pool.clone(), rx, indexer_metrics.clone()),
"handle_sui_transcations_loop"
));
Ok(handles)
}
Loading
Loading