diff --git a/Cargo.lock b/Cargo.lock index 0ea4e18f5bccc4..e0cfbae51c0132 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8384,7 +8384,7 @@ dependencies = [ [[package]] name = "odin" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=ce64446#ce64446d5ebe07fa36ab586ab169f50d65f607d1" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=063e6ff#063e6ff6ed0e6fe8ba6c92664498350ece3e88ea" dependencies = [ "async-nats", "bitcode", @@ -11723,7 +11723,7 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "structs" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=ce64446#ce64446d5ebe07fa36ab586ab169f50d65f607d1" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=063e6ff#063e6ff6ed0e6fe8ba6c92664498350ece3e88ea" dependencies = [ "bitcode", "dotenvy", @@ -11732,6 +11732,7 @@ dependencies = [ "serde", "serde_json", "serde_with 3.8.1", + "strum 0.26.3", "typeshare", ] @@ -11753,6 +11754,15 @@ dependencies = [ "strum_macros 0.25.2", ] +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros 0.26.4", +] + [[package]] name = "strum_macros" version = "0.24.3" @@ -11779,6 +11789,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2 1.0.78", + "quote 1.0.35", + "rustversion", + "syn 2.0.48", +] + [[package]] name = "subprocess" version = "0.2.9" diff --git a/Cargo.toml b/Cargo.toml index 9d6d4fb0664966..0536ab87ebc02f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -347,7 +347,7 @@ hashbrown = "0.12" hdrhistogram = "7.5.1" hex = "0.4.3" hex-literal = "0.3.4" -highlight = "all" +highlight = "0.0.3" http = "0.2.8" http-body = "0.4.5" humantime = "2.1.0" @@ -716,7 +716,7 @@ semver = "1.0.16" spinners = "4.1.0" include_dir = "0.7.3" -odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", rev = "ce64446", package = "odin" } +odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", rev = "063e6ff", package = "odin" } [patch.crates-io] quinn-proto = { git = "https://github.com/quinn-rs/quinn.git", rev = "f0fa66f871b80b9d2d7075d76967c649aecc0b77" } diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 98dc4b70d2baaf..d7b214f3154927 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -10,8 +10,13 @@ use itertools::Itertools; use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout}; use move_core_types::language_storage::{StructTag, TypeTag}; use mysten_metrics::{get_metrics, spawn_monitored_task}; +use odin::structs::sui_notifications::{ + CoinReceived, CoinSent, CoinSwap, NftBurned, NftMinted, NftReceived, NftSent, + SuiIndexerNotification, +}; use odin::sui_ws::{ AccountObjectsUpdate, CoinCreated, CoinMutated, CoinObjectUpdateStatus, ObjectChangeUpdate, + ObjectUpdateStatus, }; use odin::sui_ws::{SuiWsApiMsg, TokenBalanceUpdate, TokenUpdate}; use std::collections::{BTreeMap, HashMap}; @@ -1052,3 +1057,146 @@ pub fn generate_ws_updates_from_checkpoint_data( return (checkpoint_data.checkpoint.sequence_number, updates); } + +// for now just clone the method from above and modify it to return SuiIndexerNotification +pub fn generate_notifications_updates_from_checkpoint_data( + checkpoint_data: &CustomCheckpointDataToCommit, +) -> (u64, BTreeMap>) { + let mut notifications: BTreeMap> = BTreeMap::new(); + + for transaction in &checkpoint_data.transactions { + let mut transaction_coin_changes: HashMap> = HashMap::new(); // user_address -> coin_type -> total_change + let transaction_id = transaction.tx_sequence_number; + + // Process balance changes + for change in transaction.balance_change.iter() { + let user_address = match change.owner.get_owner_address() { + Ok(address) => address.to_string(), + Err(_) => continue, + }; + let coin_type = change.coin_type.to_canonical_string(true); + + let total_change = transaction_coin_changes + .entry(user_address) + .or_insert_with(HashMap::new) + .entry(coin_type) + .or_insert(0); + + *total_change += change.amount; + } + + // Generate notifications + for (sui_address, coin_changes) in transaction_coin_changes.iter() { + if coin_changes.len() < 3 { + continue; // Not enough changes for a swap + } + + let mut sui_change = 0i128; + let mut base_coin: Option<(String, i128)> = None; + let mut quote_coin: Option<(String, i128)> = None; + + for (coin_type, amount) in coin_changes.iter() { + if coin_type == "0x2::sui::SUI" { + sui_change = *amount; + } else if *amount < 0 && base_coin.is_none() { + base_coin = Some((coin_type.clone(), -amount)); // Store positive amount + } else if *amount > 0 && quote_coin.is_none() { + quote_coin = Some((coin_type.clone(), *amount)); + } + } + + // Check if we have a valid swap scenario + if sui_change < 0 && base_coin.is_some() && quote_coin.is_some() { + let (base_coin_type, base_amount) = base_coin.unwrap(); + let (quote_coin_type, quote_amount) = quote_coin.unwrap(); + + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSwap(CoinSwap { + sui_address: sui_address.clone(), + base_coin_type, + base_amount, + quote_coin_type, + quote_amount, + })); + } else { + // If not a swap, generate individual send/receive notifications + for (coin_type, amount) in coin_changes.iter() { + if coin_type == "0x2::sui::SUI" { + continue; // Skip SUI changes + } + if *amount < 0 { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSent(CoinSent { + sender_address: sui_address.clone(), + coin_type: coin_type.clone(), + amount: -amount, // Convert to positive + })); + } else if *amount > 0 { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinReceived(CoinReceived { + receiver_address: sui_address.clone(), + coin_type: coin_type.clone(), + amount: *amount, + })); + } + } + } + } + + // Process custom object changes (NFT part remains unchanged) + for (change_owner, object_change) in &transaction.custom_object_changes { + if let Some(owner) = change_owner { + let owner_address = owner.to_string(); + let nft_id = object_change.object_id.to_string(); + + match &object_change.status { + ObjectUpdateStatus::Created => { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::NftMinted(NftMinted { + sui_address: owner_address, + nft_id, + })); + } + ObjectUpdateStatus::Received(received) => { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::NftReceived(NftReceived { + receiver_address: received.receiver_address.clone(), + nft_id, + })); + } + ObjectUpdateStatus::Deleted => { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::NftBurned(NftBurned { + sui_address: owner_address, + nft_id, + })); + } + ObjectUpdateStatus::Sent(sent) => { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::NftSent(NftSent { + sender_address: sent.sender_address.clone(), + nft_id, + })); + } + _ => {} + } + } + } + } + + (checkpoint_data.checkpoint.sequence_number, notifications) +} diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 1e9c691f8cb719..44bb2f63c1ac4a 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use clap::Parser; -use odin::{get_odin, ConnectOptions, Odin}; +use odin::{ConnectOptions, Odin}; use sui_types::nats_queue::nats_queue; use tracing::info; @@ -42,11 +42,11 @@ async fn main() -> Result<(), IndexerError> { // TODO update on launch let odin = Odin::connect( Some(vec![ - "nats://localhost:4228".to_string(), - "nats://localhost:4229".to_string(), + "nats://65.108.110.11:4222".to_string(), + "nats://95.216.243.233:4222".to_string(), ]), Some(ConnectOptions::with_user_and_password( - "alexandria".to_string(), + "very-stronk-alexandria".to_string(), "alexandria".to_string(), )), ) diff --git a/crates/sui-types/src/nats_queue.rs b/crates/sui-types/src/nats_queue.rs index 4854c08387c5f1..5ed6928887f67d 100644 --- a/crates/sui-types/src/nats_queue.rs +++ b/crates/sui-types/src/nats_queue.rs @@ -2,7 +2,7 @@ // This task is responsible for gathering all the messages and sending them in correct order // We will use a priority queue to store the messages and send them in correct order -use odin::{sui_ws::SuiWsApiMsg, Odin}; +use odin::{structs::sui_notifications::SuiIndexerNotification, sui_ws::SuiWsApiMsg, Odin}; use std::{collections::BTreeMap, sync::Arc}; use tokio::sync::{ mpsc::{channel, Receiver, Sender}, @@ -11,22 +11,29 @@ use tokio::sync::{ use tracing::info; pub type WsPayload = (u64, Vec); +pub type NotificationPayload = (u64, BTreeMap>); pub struct NatsQueueSender { pub init_checkpoint: u64, pub sender: Arc>, pub receiver: Arc>>, + pub notifications_sender: Arc>, + pub notifications_receiver: Arc>>, odin: Arc, } pub fn nats_queue(odin: Arc) -> NatsQueueSender { - // Create sender and receiver - let (tx, rx) = channel::(10_000); // 10k + // Create sender and receiver for ws updates + let (tx, rx) = channel::(10_000); + // Create sender and receiver for notifications + let (tx_notifications, rx_notifications) = channel::(10_000); NatsQueueSender { init_checkpoint: u64::MAX, sender: Arc::new(tx), receiver: Arc::new(Mutex::new(rx)), + notifications_sender: Arc::new(tx_notifications), + notifications_receiver: Arc::new(Mutex::new(rx_notifications)), odin, } } @@ -38,6 +45,7 @@ impl NatsQueueSender { let receiver = self.receiver.clone(); let init_checkpoint = self.init_checkpoint; + // Task for ws updates tokio::spawn(async move { let mut next_index: u64 = init_checkpoint; // MAX means we have not received any message yet @@ -96,5 +104,73 @@ impl NatsQueueSender { } } }); + + // Task for notifications + let odin = self.odin.clone(); + let notifications_receiver = self.notifications_receiver.clone(); + tokio::spawn(async move { + let mut next_index: u64 = init_checkpoint; // MAX means we have not received any message yet + + let mut receiver_lock = notifications_receiver.lock().await; + + // Log init checkpoint + info!( + "Nats notifications queue init checkpoint: {}", + init_checkpoint + ); + + //Cache if we get a message with a block number that is not in order + let mut bmap_checkpoints: BTreeMap>> = + BTreeMap::new(); + + while let Some((checkpoint_seq_number, notifications)) = receiver_lock.recv().await { + // Check if we have not received any message yet + if next_index == u64::MAX { + next_index = checkpoint_seq_number + } + // Check if correct order + if checkpoint_seq_number == next_index { + // Send message + info!( + "Sending: {} notifications with seq number {}", + notifications.len(), + next_index + ); + + // Iter over notifications and ordered by sequence number send them + for (_, notifications) in notifications.iter() { + odin.publish_sui_notifications(¬ifications).await; + } + + // Update next index + next_index = next_index + 1; + // Check if we have any cached messages + while let Some(next_checkpoint) = bmap_checkpoints.remove(&next_index) { + info!( + "Sending: {} cached notifications with seq number {}", + next_checkpoint.len(), + next_index + ); + // Iter over notifications and ordered by sequence number send them + for (_, notifications) in notifications.iter() { + odin.publish_sui_notifications(¬ifications).await; + } + + // Update next index + next_index = next_index + 1; + } + } else { + info!( + "Received checkpoint with seq number {} but expected {}", + checkpoint_seq_number, next_index + ); + // Cache message + bmap_checkpoints + .entry(checkpoint_seq_number) + .or_insert(BTreeMap::new()) + .extend(notifications); + } + } + }); } }