From 9e494cb50e62dfc8af5d394691b1fcb4075cae18 Mon Sep 17 00:00:00 2001 From: Giems <109511301+Giems@users.noreply.github.com> Date: Mon, 19 Aug 2024 15:39:25 +0200 Subject: [PATCH 1/3] notifications init --- Cargo.lock | 27 ++- Cargo.toml | 4 +- .../src/handlers/checkpoint_handler.rs | 221 ++++++++++++++---- crates/sui-indexer/src/main.rs | 2 +- crates/sui-types/src/nats_queue.rs | 92 +++++++- 5 files changed, 285 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ea4e18f5bccc..e0cfbae51c013 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8384,7 +8384,7 @@ dependencies = [ [[package]] name = "odin" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=ce64446#ce64446d5ebe07fa36ab586ab169f50d65f607d1" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=063e6ff#063e6ff6ed0e6fe8ba6c92664498350ece3e88ea" dependencies = [ "async-nats", "bitcode", @@ -11723,7 +11723,7 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "structs" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=ce64446#ce64446d5ebe07fa36ab586ab169f50d65f607d1" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=063e6ff#063e6ff6ed0e6fe8ba6c92664498350ece3e88ea" dependencies = [ "bitcode", "dotenvy", @@ -11732,6 +11732,7 @@ dependencies = [ "serde", "serde_json", "serde_with 3.8.1", + "strum 0.26.3", "typeshare", ] @@ -11753,6 +11754,15 @@ dependencies = [ "strum_macros 0.25.2", ] +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros 0.26.4", +] + [[package]] name = "strum_macros" version = "0.24.3" @@ -11779,6 +11789,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2 1.0.78", + "quote 1.0.35", + "rustversion", + "syn 2.0.48", +] + [[package]] name = "subprocess" version = "0.2.9" diff --git a/Cargo.toml b/Cargo.toml index 9d6d4fb066496..0536ab87ebc02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -347,7 +347,7 @@ hashbrown = "0.12" hdrhistogram = "7.5.1" hex = "0.4.3" hex-literal = "0.3.4" -highlight = "all" +highlight = "0.0.3" http = "0.2.8" http-body = "0.4.5" humantime = "2.1.0" @@ -716,7 +716,7 @@ semver = "1.0.16" spinners = "4.1.0" include_dir = "0.7.3" -odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", rev = "ce64446", package = "odin" } +odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", rev = "063e6ff", package = "odin" } [patch.crates-io] quinn-proto = { git = "https://github.com/quinn-rs/quinn.git", rev = "f0fa66f871b80b9d2d7075d76967c649aecc0b77" } diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 98dc4b70d2baa..5e81eca6f06d1 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -6,12 +6,18 @@ use crate::handlers::tx_processor::IndexingPackageBuffer; use crate::handlers::CustomCheckpointDataToCommit; use crate::models::display::StoredDisplay; use async_trait::async_trait; +use chrono::Utc; use itertools::Itertools; use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout}; use move_core_types::language_storage::{StructTag, TypeTag}; use mysten_metrics::{get_metrics, spawn_monitored_task}; +use odin::structs::sui_notifications::{ + CoinReceived, CoinSent, CoinSwap, NftBurned, NftMinted, NftReceived, NftSent, + SuiIndexerNotification, +}; use odin::sui_ws::{ AccountObjectsUpdate, CoinCreated, CoinMutated, CoinObjectUpdateStatus, ObjectChangeUpdate, + ObjectUpdateStatus, }; use odin::sui_ws::{SuiWsApiMsg, TokenBalanceUpdate, TokenUpdate}; use std::collections::{BTreeMap, HashMap}; @@ -25,7 +31,7 @@ use sui_types::dynamic_field::DynamicFieldType; use sui_types::messages_checkpoint::{ CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, }; -use sui_types::nats_queue::{NatsQueueSender, WsPayload}; +use sui_types::nats_queue::{NatsQueueSender, NotificationPayload, WsPayload}; use sui_types::object::Object; use tokio::sync::watch; use tokio_util::sync::CancellationToken; @@ -157,8 +163,13 @@ where .await?; // Send ws updates via nats - let nats_ws_payload = generate_ws_updates_from_checkpoint_data(&checkpoint_data); - self.nats_queue.sender.send(nats_ws_payload).await?; + let (nats_ws_payload, notifications_payload) = + generate_updates_from_checkpoint_data(&checkpoint_data); + self.nats_queue.ws_sender.send(nats_ws_payload).await?; + self.nats_queue + .notifications_sender + .send(notifications_payload) + .await?; // Convert custom checkpoint data into original type self.indexed_checkpoint_sender @@ -953,102 +964,216 @@ fn try_create_dynamic_field_info( })) } -pub fn generate_ws_updates_from_checkpoint_data( +pub fn generate_updates_from_checkpoint_data( checkpoint_data: &CustomCheckpointDataToCommit, -) -> WsPayload { +) -> (WsPayload, NotificationPayload) { let block_number = checkpoint_data.checkpoint.sequence_number; - + let mut notifications: BTreeMap> = BTreeMap::new(); let mut ws_balance_changes: HashMap = HashMap::new(); - let mut account_object_changes: HashMap = HashMap::new(); // account address -> account object changes + let mut account_object_changes: HashMap = HashMap::new(); let mut objects_changes: Vec = Vec::new(); - // transaction balance changes - for transaction in checkpoint_data.transactions.iter() { - for change in transaction.balance_change.iter() { - let user_address = match &change.owner.get_owner_address() { + for transaction in &checkpoint_data.transactions { + let transaction_id = transaction.tx_sequence_number; + let mut transaction_coin_changes: HashMap> = HashMap::new(); + + // Process balance changes + for change in &transaction.balance_change { + let user_address = match change.owner.get_owner_address() { Ok(address) => address.to_string(), Err(_) => continue, }; let coin_type = change.coin_type.to_canonical_string(true); - // Prepare ws update + // Update transaction_coin_changes for notifications + let total_change = transaction_coin_changes + .entry(user_address.clone()) + .or_insert_with(HashMap::new) + .entry(coin_type.clone()) + .or_insert(0); + *total_change += change.amount; + + // Update ws_balance_changes for WS updates let ws_update = ws_balance_changes .entry(user_address.clone()) .or_insert(TokenBalanceUpdate { - sui_address: user_address.clone(), + sui_address: user_address, sequence_number: block_number, changed_balances: HashMap::new(), - timestamp_ms: chrono::Utc::now().timestamp_millis() as u64, + timestamp_ms: Utc::now().timestamp_millis() as u64, }) .changed_balances .entry(coin_type.clone()) .or_insert(TokenUpdate { - coin_type, + coin_type: coin_type.clone(), object_changes: HashMap::new(), }); + match change.status { - // New coin is created ObjectStatus::Created => { - // Update ws update - ws_update - .object_changes - .entry(change.object_id.clone()) - .or_insert(CoinObjectUpdateStatus::Created(CoinCreated { + ws_update.object_changes.insert( + change.object_id.clone(), + CoinObjectUpdateStatus::Created(CoinCreated { amount: change.amount, - })); + }), + ); } ObjectStatus::Mutated => { - // Update ws update - ws_update - .object_changes - .entry(change.object_id.clone()) - .or_insert(CoinObjectUpdateStatus::Mutated(CoinMutated { + ws_update.object_changes.insert( + change.object_id.clone(), + CoinObjectUpdateStatus::Mutated(CoinMutated { change: change.amount, - })); + }), + ); } ObjectStatus::Deleted => { - // Update ws update ws_update .object_changes - .entry(change.object_id.clone()) - .or_insert(CoinObjectUpdateStatus::Deleted); + .insert(change.object_id.clone(), CoinObjectUpdateStatus::Deleted); + } + } + } + + // Generate notifications + for (sui_address, coin_changes) in &transaction_coin_changes { + if coin_changes.len() >= 3 { + // Potential swap scenario + let mut sui_change = 0i128; + let mut base_coin: Option<(String, i128)> = None; + let mut quote_coin: Option<(String, i128)> = None; + + for (coin_type, amount) in coin_changes { + if coin_type == "0x2::sui::SUI" { + sui_change = *amount; + } else if *amount < 0 && base_coin.is_none() { + base_coin = Some((coin_type.clone(), -amount)); + } else if *amount > 0 && quote_coin.is_none() { + quote_coin = Some((coin_type.clone(), *amount)); + } + } + + if sui_change < 0 && base_coin.is_some() && quote_coin.is_some() { + let (base_coin_type, base_amount) = base_coin.unwrap(); + let (quote_coin_type, quote_amount) = quote_coin.unwrap(); + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSwap(CoinSwap { + sui_address: sui_address.clone(), + base_coin_type, + base_amount, + quote_coin_type, + quote_amount, + })); + continue; + } + } + + // If not a swap, generate individual send/receive notifications + for (coin_type, amount) in coin_changes { + if coin_type == "0x2::sui::SUI" { + continue; + } + if *amount < 0 { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSent(CoinSent { + sender_address: sui_address.clone(), + coin_type: coin_type.clone(), + amount: -amount, + })); + } else if *amount > 0 { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinReceived(CoinReceived { + receiver_address: sui_address.clone(), + coin_type: coin_type.clone(), + amount: *amount, + })); } } } - for (change_owner, object_change) in transaction.custom_object_changes.iter() { + // Process custom object changes + for (change_owner, object_change) in &transaction.custom_object_changes { if let Some(owner) = change_owner { + let owner_address = owner.to_string(); + let nft_id = object_change.object_id.to_string(); + + // Update account_object_changes for WS updates account_object_changes - .entry(owner.to_string()) + .entry(owner_address.clone()) .or_insert(AccountObjectsUpdate { - sui_address: owner.to_string(), + sui_address: owner_address.clone(), sequence_number: block_number, object_changes: HashMap::new(), - timestamp_ms: chrono::Utc::now().timestamp_millis() as u64, + timestamp_ms: Utc::now().timestamp_millis() as u64, }) .object_changes - .entry(object_change.object_id.clone()) - .or_insert(object_change.clone()); + .insert(object_change.object_id.clone(), object_change.clone()); + + objects_changes.push(object_change.clone()); + + // Generate notifications for NFT changes + match &object_change.status { + ObjectUpdateStatus::Created => { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::NftMinted(NftMinted { + sui_address: owner_address, + nft_id, + })); + } + ObjectUpdateStatus::Received(received) => { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::NftReceived(NftReceived { + receiver_address: received.receiver_address.clone(), + nft_id, + })); + } + ObjectUpdateStatus::Deleted => { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::NftBurned(NftBurned { + sui_address: owner_address, + nft_id, + })); + } + ObjectUpdateStatus::Sent(sent) => { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::NftSent(NftSent { + sender_address: sent.sender_address.clone(), + nft_id, + })); + } + _ => {} + } } - - objects_changes.push(object_change.clone()); } } - let updates: Vec = ws_balance_changes + // Prepare WS updates + let ws_updates: Vec = ws_balance_changes .into_values() - .map(|v| SuiWsApiMsg::TokenBalanceUpdate(v)) + .map(SuiWsApiMsg::TokenBalanceUpdate) .chain( account_object_changes .into_values() - .map(|v| SuiWsApiMsg::AccountObjectsUpdate(v)), - ) - .chain( - objects_changes - .into_iter() - .map(|v| SuiWsApiMsg::ObjectUpdate(v)), + .map(SuiWsApiMsg::AccountObjectsUpdate), ) + .chain(objects_changes.into_iter().map(SuiWsApiMsg::ObjectUpdate)) .collect(); - return (checkpoint_data.checkpoint.sequence_number, updates); + return ( + (checkpoint_data.checkpoint.sequence_number, ws_updates), + (checkpoint_data.checkpoint.sequence_number, notifications), + ); } diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 1e9c691f8cb71..f57566cf3a831 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use clap::Parser; -use odin::{get_odin, ConnectOptions, Odin}; +use odin::{ConnectOptions, Odin}; use sui_types::nats_queue::nats_queue; use tracing::info; diff --git a/crates/sui-types/src/nats_queue.rs b/crates/sui-types/src/nats_queue.rs index 4854c08387c5f..ad406090bd0b2 100644 --- a/crates/sui-types/src/nats_queue.rs +++ b/crates/sui-types/src/nats_queue.rs @@ -2,7 +2,7 @@ // This task is responsible for gathering all the messages and sending them in correct order // We will use a priority queue to store the messages and send them in correct order -use odin::{sui_ws::SuiWsApiMsg, Odin}; +use odin::{structs::sui_notifications::SuiIndexerNotification, sui_ws::SuiWsApiMsg, Odin}; use std::{collections::BTreeMap, sync::Arc}; use tokio::sync::{ mpsc::{channel, Receiver, Sender}, @@ -11,22 +11,29 @@ use tokio::sync::{ use tracing::info; pub type WsPayload = (u64, Vec); +pub type NotificationPayload = (u64, BTreeMap>); pub struct NatsQueueSender { pub init_checkpoint: u64, - pub sender: Arc>, - pub receiver: Arc>>, + pub ws_sender: Arc>, + pub ws_receiver: Arc>>, + pub notifications_sender: Arc>, + pub notifications_receiver: Arc>>, odin: Arc, } pub fn nats_queue(odin: Arc) -> NatsQueueSender { - // Create sender and receiver - let (tx, rx) = channel::(10_000); // 10k + // Create sender and receiver for ws updates + let (tx, rx) = channel::(10_000); + // Create sender and receiver for notifications + let (tx_notifications, rx_notifications) = channel::(10_000); NatsQueueSender { init_checkpoint: u64::MAX, - sender: Arc::new(tx), - receiver: Arc::new(Mutex::new(rx)), + ws_sender: Arc::new(tx), + ws_receiver: Arc::new(Mutex::new(rx)), + notifications_sender: Arc::new(tx_notifications), + notifications_receiver: Arc::new(Mutex::new(rx_notifications)), odin, } } @@ -35,9 +42,10 @@ impl NatsQueueSender { pub async fn run(&mut self) { // Spawn task that will order the messages let odin = self.odin.clone(); - let receiver = self.receiver.clone(); + let receiver = self.ws_receiver.clone(); let init_checkpoint = self.init_checkpoint; + // Task for ws updates tokio::spawn(async move { let mut next_index: u64 = init_checkpoint; // MAX means we have not received any message yet @@ -96,5 +104,73 @@ impl NatsQueueSender { } } }); + + // Task for notifications + let odin = self.odin.clone(); + let notifications_receiver = self.notifications_receiver.clone(); + tokio::spawn(async move { + let mut next_index: u64 = init_checkpoint; // MAX means we have not received any message yet + + let mut receiver_lock = notifications_receiver.lock().await; + + // Log init checkpoint + info!( + "Nats notifications queue init checkpoint: {}", + init_checkpoint + ); + + //Cache if we get a message with a block number that is not in order + let mut bmap_checkpoints: BTreeMap>> = + BTreeMap::new(); + + while let Some((checkpoint_seq_number, notifications)) = receiver_lock.recv().await { + // Check if we have not received any message yet + if next_index == u64::MAX { + next_index = checkpoint_seq_number + } + // Check if correct order + if checkpoint_seq_number == next_index { + // Send message + info!( + "Sending: {} notifications with seq number {}", + notifications.len(), + next_index + ); + + // Iter over notifications and ordered by sequence number send them + for (_, notifications) in notifications.iter() { + odin.publish_sui_notifications(¬ifications).await; + } + + // Update next index + next_index = next_index + 1; + // Check if we have any cached messages + while let Some(next_checkpoint) = bmap_checkpoints.remove(&next_index) { + info!( + "Sending: {} cached notifications with seq number {}", + next_checkpoint.len(), + next_index + ); + // Iter over notifications and ordered by sequence number send them + for (_, notifications) in notifications.iter() { + odin.publish_sui_notifications(¬ifications).await; + } + + // Update next index + next_index = next_index + 1; + } + } else { + info!( + "Received checkpoint with seq number {} but expected {}", + checkpoint_seq_number, next_index + ); + // Cache message + bmap_checkpoints + .entry(checkpoint_seq_number) + .or_insert(BTreeMap::new()) + .extend(notifications); + } + } + }); } } From f78cf6e64930138b6ba68f5cfb9fa5a99b7fe5aa Mon Sep 17 00:00:00 2001 From: Giems <109511301+Giems@users.noreply.github.com> Date: Tue, 20 Aug 2024 10:32:01 +0200 Subject: [PATCH 2/3] update for coin balances notifications --- Cargo.lock | 179 ++++++++++++- Cargo.toml | 2 +- .../src/handlers/checkpoint_handler.rs | 240 ++++++++++++++---- crates/sui-json-rpc/src/balance_changes.rs | 9 + 4 files changed, 375 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0cfbae51c013..066d46510dc18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3977,6 +3977,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "erased-serde" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c" +dependencies = [ + "serde", +] + [[package]] name = "errno" version = "0.3.8" @@ -4483,6 +4492,20 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +[[package]] +name = "fcm" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d8d0da8a6bd63bdec888b6d7a87c5698230005c1800823d28ddd5adb6f2550f" +dependencies = [ + "chrono", + "erased-serde", + "log", + "reqwest 0.11.20", + "serde", + "serde_json", +] + [[package]] name = "fd-lock" version = "3.0.13" @@ -4584,6 +4607,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "firebase" +version = "0.1.0" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=017d52bca2e11f6cdd98fe5fc5186e01b95c730f#017d52bca2e11f6cdd98fe5fc5186e01b95c730f" +dependencies = [ + "fcm", + "reqwest 0.12.4", + "serde", + "serde_json", + "structs", + "tokio", +] + [[package]] name = "fixed-hash" version = "0.7.0" @@ -4655,6 +4691,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.1.0" @@ -5462,6 +5513,35 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.26", + "native-tls", + "tokio", + "tokio-native-tls", +] + +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.7" @@ -7904,6 +7984,23 @@ dependencies = [ "typed-store", ] +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "neptune" version = "13.0.0" @@ -8384,10 +8481,11 @@ dependencies = [ [[package]] name = "odin" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=063e6ff#063e6ff6ed0e6fe8ba6c92664498350ece3e88ea" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=017d52bca2e11f6cdd98fe5fc5186e01b95c730f#017d52bca2e11f6cdd98fe5fc5186e01b95c730f" dependencies = [ "async-nats", "bitcode", + "firebase", "futures", "serde", "serde_json", @@ -8472,12 +8570,50 @@ dependencies = [ "serde_json", ] +[[package]] +name = "openssl" +version = "0.10.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" +dependencies = [ + "bitflags 2.4.1", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.48", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.20.0" @@ -10033,10 +10169,12 @@ dependencies = [ "http 0.2.9", "http-body 0.4.5", "hyper 0.14.26", + "hyper-tls 0.5.0", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -10044,6 +10182,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", + "tokio-native-tls", "tower-service", "url", "wasm-bindgen", @@ -10061,6 +10200,7 @@ dependencies = [ "async-compression 0.4.12", "base64 0.22.1", "bytes", + "encoding_rs", "futures-channel", "futures-core", "futures-util", @@ -10070,11 +10210,13 @@ dependencies = [ "http-body-util", "hyper 1.4.1", "hyper-rustls 0.26.0", + "hyper-tls 0.6.0", "hyper-util", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -10086,7 +10228,9 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper", + "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls 0.25.0", "tokio-util 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service", @@ -11723,7 +11867,7 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "structs" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=063e6ff#063e6ff6ed0e6fe8ba6c92664498350ece3e88ea" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=017d52bca2e11f6cdd98fe5fc5186e01b95c730f#017d52bca2e11f6cdd98fe5fc5186e01b95c730f" dependencies = [ "bitcode", "dotenvy", @@ -14407,6 +14551,27 @@ dependencies = [ "winapi", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tabled" version = "0.12.2" @@ -14846,6 +15011,16 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-retry" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 0536ab87ebc02..9124092502b41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -716,7 +716,7 @@ semver = "1.0.16" spinners = "4.1.0" include_dir = "0.7.3" -odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", rev = "063e6ff", package = "odin" } +odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", rev = "017d52bca2e11f6cdd98fe5fc5186e01b95c730f", package = "odin" } [patch.crates-io] quinn-proto = { git = "https://github.com/quinn-rs/quinn.git", rev = "f0fa66f871b80b9d2d7075d76967c649aecc0b77" } diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 5e81eca6f06d1..15957ee76d9dc 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -47,7 +47,7 @@ use sui_types::event::SystemEpochInfoEvent; use sui_types::object::Owner; use sui_types::transaction::TransactionDataAPI; use tap::tap::TapFallible; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use sui_types::base_types::ObjectID; use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary; @@ -976,6 +976,8 @@ pub fn generate_updates_from_checkpoint_data( for transaction in &checkpoint_data.transactions { let transaction_id = transaction.tx_sequence_number; let mut transaction_coin_changes: HashMap> = HashMap::new(); + let mut gas_change: i128 = 0; + let mut gas_owner: Option = None; // Process balance changes for change in &transaction.balance_change { @@ -983,6 +985,13 @@ pub fn generate_updates_from_checkpoint_data( Ok(address) => address.to_string(), Err(_) => continue, }; + + if change.coin_type == TypeTag::Bool { + // This is for out custom sui gas change for transaction + gas_change = change.amount; + gas_owner = Some(user_address.clone()); + continue; + } let coin_type = change.coin_type.to_canonical_string(true); // Update transaction_coin_changes for notifications @@ -1034,66 +1043,193 @@ pub fn generate_updates_from_checkpoint_data( } } - // Generate notifications - for (sui_address, coin_changes) in &transaction_coin_changes { - if coin_changes.len() >= 3 { - // Potential swap scenario - let mut sui_change = 0i128; - let mut base_coin: Option<(String, i128)> = None; - let mut quote_coin: Option<(String, i128)> = None; - - for (coin_type, amount) in coin_changes { - if coin_type == "0x2::sui::SUI" { - sui_change = *amount; - } else if *amount < 0 && base_coin.is_none() { - base_coin = Some((coin_type.clone(), -amount)); - } else if *amount > 0 && quote_coin.is_none() { - quote_coin = Some((coin_type.clone(), *amount)); + // Generate notifications, There should always be at least one coin change for each transaction, SUI + // gas owner should be present in the transaction at this point + if let Some(gas_owner) = gas_owner { + for (sui_address, coin_changes) in &transaction_coin_changes { + if coin_changes.len() == 1 { + // First check if single change is happening for gas owner + if sui_address == &gas_owner { + // Either sui was used for gas or user has sent sui to another user while also paying gas + let coin_type = coin_changes.keys().next().unwrap(); + let amount = coin_changes.values().next().unwrap(); + + if coin_type != "0x2::sui::SUI" { + error!("Unexpected coin type for single coin change: {}", coin_type); + continue; + } + + // Check if balance change is equal to gas change, + // based on the data it we should be able to determine if it was a gas payment or a transfer + if *amount == gas_change { + // Gas payment, do nothing + } else { + // Sui transfer + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSent(CoinSent { + sender_address: sui_address.clone(), + coin_type: coin_type.clone(), + amount: *amount, + })); + } + } else { + // Single coin change for non-gas owner, very likely a receive + let coin_type = coin_changes.keys().next().unwrap(); + let amount = coin_changes.values().next().unwrap(); + + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinReceived(CoinReceived { + receiver_address: sui_address.clone(), + coin_type: coin_type.clone(), + amount: *amount, + })); } - } - if sui_change < 0 && base_coin.is_some() && quote_coin.is_some() { - let (base_coin_type, base_amount) = base_coin.unwrap(); - let (quote_coin_type, quote_amount) = quote_coin.unwrap(); - notifications - .entry(transaction_id) - .or_insert_with(Vec::new) - .push(SuiIndexerNotification::CoinSwap(CoinSwap { - sui_address: sui_address.clone(), - base_coin_type, - base_amount, - quote_coin_type, - quote_amount, - })); continue; } - } - // If not a swap, generate individual send/receive notifications - for (coin_type, amount) in coin_changes { - if coin_type == "0x2::sui::SUI" { + if coin_changes.len() == 2 { + // Two coin changes, two possible scenarios + // 1. Sui gas payment and a coin transfer to another user + // 2. Swap using Sui and another coin + + let mut sui_change = 0i128; + let mut other_change = 0i128; + let mut other_token_type = String::new(); + + for (coin_type, amount) in coin_changes { + if coin_type == "0x2::sui::SUI" { + sui_change = *amount; + } else { + other_change = *amount; + other_token_type = coin_type.clone(); + } + } + + if sui_change == gas_change { + // Sui gas payment and a coin transfer to another user + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSent(CoinSent { + sender_address: sui_address.clone(), + coin_type: other_token_type, + amount: other_change, + })); + } else { + // Swap using Sui and another coin, base is a token which we swap to receive another token + // So base is the token with negative change and quote is the token with positive change + let (base_coin_type, base_amount, quote_coin_type, quote_amount) = + if sui_change < 0 { + ( + "0x2::sui::SUI", + sui_change, + other_token_type.as_str(), + other_change, + ) + } else { + ( + other_token_type.as_str(), + other_change, + "0x2::sui::SUI", + sui_change, + ) + }; + + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSwap(CoinSwap { + sui_address: sui_address.clone(), + spent_token_type: base_coin_type.to_string(), + spent_amount: -base_amount, + received: vec![(quote_coin_type.to_string(), quote_amount)], + })); + } + } + + // By this point we have multiple coin changes in the transaction for the user, which means swap or multiple sends/receives + let mut positive_changes: Vec<(String, i128)> = coin_changes + .iter() + .filter(|(_, amount)| **amount > 0) + .map(|(coin_type, amount)| (coin_type.clone(), *amount)) + .collect(); + + // order positive changes by amount, largest first (3, 2, 1) + positive_changes.sort_by(|a, b| b.1.cmp(&a.1)); + + let mut negative_changes: Vec<(String, i128)> = coin_changes + .iter() + .filter(|(_, amount)| **amount < 0) + .map(|(coin_type, amount)| (coin_type.clone(), *amount)) + .collect(); + + // order negative changes by amount, largest first (-3, -2, -1) + negative_changes.sort_by(|a, b| a.1.cmp(&b.1)); + + // Few possible scenarios + // 1. Only negative changes, user sent multiple coins + // 2. Only positive changes, user received multiple coins + // 3. If there is at least one negative change and one positive change, it's a swap, + // biggest positive change is the quote coin as the rest should be just dust + + // 1. + if positive_changes.is_empty() { + for (coin_type, amount) in negative_changes { + if coin_type == "0x2::sui::SUI" { + // Check if SUI change was equal to gas change + if amount == gas_change { + // Gas payment, do nothing + continue; + } + } + + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSent(CoinSent { + sender_address: sui_address.clone(), + coin_type: coin_type.clone(), + amount: -amount, + })); + } continue; } - if *amount < 0 { - notifications - .entry(transaction_id) - .or_insert_with(Vec::new) - .push(SuiIndexerNotification::CoinSent(CoinSent { - sender_address: sui_address.clone(), - coin_type: coin_type.clone(), - amount: -amount, - })); - } else if *amount > 0 { - notifications - .entry(transaction_id) - .or_insert_with(Vec::new) - .push(SuiIndexerNotification::CoinReceived(CoinReceived { - receiver_address: sui_address.clone(), - coin_type: coin_type.clone(), - amount: *amount, - })); + + // 2. + if negative_changes.is_empty() { + for (coin_type, amount) in positive_changes { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinReceived(CoinReceived { + receiver_address: sui_address.clone(), + coin_type: coin_type.clone(), + amount, + })); + } + continue; } + + // 3. + let (base_coin_type, base_amount) = negative_changes.remove(0); + + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSwap(CoinSwap { + sui_address: sui_address.clone(), + spent_token_type: base_coin_type.clone(), + spent_amount: -base_amount, + received: positive_changes, + })); } + } else { + error!("Gas owner not found for transaction {}", transaction_id); } // Process custom object changes diff --git a/crates/sui-json-rpc/src/balance_changes.rs b/crates/sui-json-rpc/src/balance_changes.rs index c7abc73a6f40b..4a4de9f181c74 100644 --- a/crates/sui-json-rpc/src/balance_changes.rs +++ b/crates/sui-json-rpc/src/balance_changes.rs @@ -260,6 +260,15 @@ pub async fn get_balance_changes_with_status_from_effect Date: Tue, 20 Aug 2024 11:15:45 +0200 Subject: [PATCH 3/3] cr fixes --- Cargo.lock | 6 +++--- Cargo.toml | 2 +- crates/sui-indexer/src/handlers/checkpoint_handler.rs | 8 ++------ 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 066d46510dc18..44dd32541a47d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4610,7 +4610,7 @@ dependencies = [ [[package]] name = "firebase" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=017d52bca2e11f6cdd98fe5fc5186e01b95c730f#017d52bca2e11f6cdd98fe5fc5186e01b95c730f" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=bd401bc3807bba5bb5203f42cc7b8cf836be83f5#bd401bc3807bba5bb5203f42cc7b8cf836be83f5" dependencies = [ "fcm", "reqwest 0.12.4", @@ -8481,7 +8481,7 @@ dependencies = [ [[package]] name = "odin" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=017d52bca2e11f6cdd98fe5fc5186e01b95c730f#017d52bca2e11f6cdd98fe5fc5186e01b95c730f" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=bd401bc3807bba5bb5203f42cc7b8cf836be83f5#bd401bc3807bba5bb5203f42cc7b8cf836be83f5" dependencies = [ "async-nats", "bitcode", @@ -11867,7 +11867,7 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "structs" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=017d52bca2e11f6cdd98fe5fc5186e01b95c730f#017d52bca2e11f6cdd98fe5fc5186e01b95c730f" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=bd401bc3807bba5bb5203f42cc7b8cf836be83f5#bd401bc3807bba5bb5203f42cc7b8cf836be83f5" dependencies = [ "bitcode", "dotenvy", diff --git a/Cargo.toml b/Cargo.toml index 9124092502b41..6c02cfcd7dfcc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -716,7 +716,7 @@ semver = "1.0.16" spinners = "4.1.0" include_dir = "0.7.3" -odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", rev = "017d52bca2e11f6cdd98fe5fc5186e01b95c730f", package = "odin" } +odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", rev = "bd401bc3807bba5bb5203f42cc7b8cf836be83f5", package = "odin" } [patch.crates-io] quinn-proto = { git = "https://github.com/quinn-rs/quinn.git", rev = "f0fa66f871b80b9d2d7075d76967c649aecc0b77" } diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 15957ee76d9dc..3009faeca7b26 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -1145,8 +1145,7 @@ pub fn generate_updates_from_checkpoint_data( .or_insert_with(Vec::new) .push(SuiIndexerNotification::CoinSwap(CoinSwap { sui_address: sui_address.clone(), - spent_token_type: base_coin_type.to_string(), - spent_amount: -base_amount, + spent: vec![(base_coin_type.to_string(), -base_amount)], received: vec![(quote_coin_type.to_string(), quote_amount)], })); } @@ -1216,15 +1215,12 @@ pub fn generate_updates_from_checkpoint_data( } // 3. - let (base_coin_type, base_amount) = negative_changes.remove(0); - notifications .entry(transaction_id) .or_insert_with(Vec::new) .push(SuiIndexerNotification::CoinSwap(CoinSwap { sui_address: sui_address.clone(), - spent_token_type: base_coin_type.clone(), - spent_amount: -base_amount, + spent: negative_changes, received: positive_changes, })); }