Skip to content

Commit

Permalink
init notifications generation
Browse files Browse the repository at this point in the history
  • Loading branch information
Giems committed Aug 14, 2024
1 parent 260be09 commit 84d5e65
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 11 deletions.
27 changes: 25 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ hashbrown = "0.12"
hdrhistogram = "7.5.1"
hex = "0.4.3"
hex-literal = "0.3.4"
highlight = "all"
highlight = "0.0.3"
http = "0.2.8"
http-body = "0.4.5"
humantime = "2.1.0"
Expand Down Expand Up @@ -716,7 +716,7 @@ semver = "1.0.16"
spinners = "4.1.0"
include_dir = "0.7.3"

odin = { git = "ssh://[email protected]/nightly-labs/alexandria.git", rev = "ce64446", package = "odin" }
odin = { git = "ssh://[email protected]/nightly-labs/alexandria.git", rev = "063e6ff", package = "odin" }

[patch.crates-io]
quinn-proto = { git = "https://github.com/quinn-rs/quinn.git", rev = "f0fa66f871b80b9d2d7075d76967c649aecc0b77" }
148 changes: 148 additions & 0 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ use itertools::Itertools;
use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
use move_core_types::language_storage::{StructTag, TypeTag};
use mysten_metrics::{get_metrics, spawn_monitored_task};
use odin::structs::sui_notifications::{
CoinReceived, CoinSent, CoinSwap, NftBurned, NftMinted, NftReceived, NftSent,
SuiIndexerNotification,
};
use odin::sui_ws::{
AccountObjectsUpdate, CoinCreated, CoinMutated, CoinObjectUpdateStatus, ObjectChangeUpdate,
ObjectUpdateStatus,
};
use odin::sui_ws::{SuiWsApiMsg, TokenBalanceUpdate, TokenUpdate};
use std::collections::{BTreeMap, HashMap};
Expand Down Expand Up @@ -1052,3 +1057,146 @@ pub fn generate_ws_updates_from_checkpoint_data(

return (checkpoint_data.checkpoint.sequence_number, updates);
}

// for now just clone the method from above and modify it to return SuiIndexerNotification
pub fn generate_notifications_updates_from_checkpoint_data(
checkpoint_data: &CustomCheckpointDataToCommit,
) -> (u64, BTreeMap<u64, Vec<SuiIndexerNotification>>) {
let mut notifications: BTreeMap<u64, Vec<SuiIndexerNotification>> = BTreeMap::new();

for transaction in &checkpoint_data.transactions {
let mut transaction_coin_changes: HashMap<String, HashMap<String, i128>> = HashMap::new(); // user_address -> coin_type -> total_change
let transaction_id = transaction.tx_sequence_number;

// Process balance changes
for change in transaction.balance_change.iter() {
let user_address = match change.owner.get_owner_address() {
Ok(address) => address.to_string(),
Err(_) => continue,
};
let coin_type = change.coin_type.to_canonical_string(true);

let total_change = transaction_coin_changes
.entry(user_address)
.or_insert_with(HashMap::new)
.entry(coin_type)
.or_insert(0);

*total_change += change.amount;
}

// Generate notifications
for (sui_address, coin_changes) in transaction_coin_changes.iter() {
if coin_changes.len() < 3 {
continue; // Not enough changes for a swap
}

let mut sui_change = 0i128;
let mut base_coin: Option<(String, i128)> = None;
let mut quote_coin: Option<(String, i128)> = None;

for (coin_type, amount) in coin_changes.iter() {
if coin_type == "0x2::sui::SUI" {
sui_change = *amount;
} else if *amount < 0 && base_coin.is_none() {
base_coin = Some((coin_type.clone(), -amount)); // Store positive amount
} else if *amount > 0 && quote_coin.is_none() {
quote_coin = Some((coin_type.clone(), *amount));
}
}

// Check if we have a valid swap scenario
if sui_change < 0 && base_coin.is_some() && quote_coin.is_some() {
let (base_coin_type, base_amount) = base_coin.unwrap();
let (quote_coin_type, quote_amount) = quote_coin.unwrap();

notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinSwap(CoinSwap {
sui_address: sui_address.clone(),
base_coin_type,
base_amount,
quote_coin_type,
quote_amount,
}));
} else {
// If not a swap, generate individual send/receive notifications
for (coin_type, amount) in coin_changes.iter() {
if coin_type == "0x2::sui::SUI" {
continue; // Skip SUI changes
}
if *amount < 0 {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinSent(CoinSent {
sender_address: sui_address.clone(),
coin_type: coin_type.clone(),
amount: -amount, // Convert to positive
}));
} else if *amount > 0 {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinReceived(CoinReceived {
receiver_address: sui_address.clone(),
coin_type: coin_type.clone(),
amount: *amount,
}));
}
}
}
}

// Process custom object changes (NFT part remains unchanged)
for (change_owner, object_change) in &transaction.custom_object_changes {
if let Some(owner) = change_owner {
let owner_address = owner.to_string();
let nft_id = object_change.object_id.to_string();

match &object_change.status {
ObjectUpdateStatus::Created => {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::NftMinted(NftMinted {
sui_address: owner_address,
nft_id,
}));
}
ObjectUpdateStatus::Received(received) => {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::NftReceived(NftReceived {
receiver_address: received.receiver_address.clone(),
nft_id,
}));
}
ObjectUpdateStatus::Deleted => {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::NftBurned(NftBurned {
sui_address: owner_address,
nft_id,
}));
}
ObjectUpdateStatus::Sent(sent) => {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::NftSent(NftSent {
sender_address: sent.sender_address.clone(),
nft_id,
}));
}
_ => {}
}
}
}
}

(checkpoint_data.checkpoint.sequence_number, notifications)
}
8 changes: 4 additions & 4 deletions crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::sync::Arc;

use clap::Parser;
use odin::{get_odin, ConnectOptions, Odin};
use odin::{ConnectOptions, Odin};
use sui_types::nats_queue::nats_queue;
use tracing::info;

Expand Down Expand Up @@ -42,11 +42,11 @@ async fn main() -> Result<(), IndexerError> {
// TODO update on launch
let odin = Odin::connect(
Some(vec![
"nats://localhost:4228".to_string(),
"nats://localhost:4229".to_string(),
"nats://65.108.110.11:4222".to_string(),
"nats://95.216.243.233:4222".to_string(),
]),
Some(ConnectOptions::with_user_and_password(
"alexandria".to_string(),
"very-stronk-alexandria".to_string(),
"alexandria".to_string(),
)),
)
Expand Down
82 changes: 79 additions & 3 deletions crates/sui-types/src/nats_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// This task is responsible for gathering all the messages and sending them in correct order
// We will use a priority queue to store the messages and send them in correct order

use odin::{sui_ws::SuiWsApiMsg, Odin};
use odin::{structs::sui_notifications::SuiIndexerNotification, sui_ws::SuiWsApiMsg, Odin};
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Expand All @@ -11,22 +11,29 @@ use tokio::sync::{
use tracing::info;

pub type WsPayload = (u64, Vec<SuiWsApiMsg>);
pub type NotificationPayload = (u64, BTreeMap<u64, Vec<SuiIndexerNotification>>);

pub struct NatsQueueSender {
pub init_checkpoint: u64,
pub sender: Arc<Sender<WsPayload>>,
pub receiver: Arc<Mutex<Receiver<WsPayload>>>,
pub notifications_sender: Arc<Sender<NotificationPayload>>,
pub notifications_receiver: Arc<Mutex<Receiver<NotificationPayload>>>,
odin: Arc<Odin>,
}

pub fn nats_queue(odin: Arc<Odin>) -> NatsQueueSender {
// Create sender and receiver
let (tx, rx) = channel::<WsPayload>(10_000); // 10k
// Create sender and receiver for ws updates
let (tx, rx) = channel::<WsPayload>(10_000);
// Create sender and receiver for notifications
let (tx_notifications, rx_notifications) = channel::<NotificationPayload>(10_000);

NatsQueueSender {
init_checkpoint: u64::MAX,
sender: Arc::new(tx),
receiver: Arc::new(Mutex::new(rx)),
notifications_sender: Arc::new(tx_notifications),
notifications_receiver: Arc::new(Mutex::new(rx_notifications)),
odin,
}
}
Expand All @@ -38,6 +45,7 @@ impl NatsQueueSender {
let receiver = self.receiver.clone();
let init_checkpoint = self.init_checkpoint;

// Task for ws updates
tokio::spawn(async move {
let mut next_index: u64 = init_checkpoint; // MAX means we have not received any message yet

Expand Down Expand Up @@ -96,5 +104,73 @@ impl NatsQueueSender {
}
}
});

// Task for notifications
let odin = self.odin.clone();
let notifications_receiver = self.notifications_receiver.clone();
tokio::spawn(async move {
let mut next_index: u64 = init_checkpoint; // MAX means we have not received any message yet

let mut receiver_lock = notifications_receiver.lock().await;

// Log init checkpoint
info!(
"Nats notifications queue init checkpoint: {}",
init_checkpoint
);

//Cache if we get a message with a block number that is not in order
let mut bmap_checkpoints: BTreeMap<u64, BTreeMap<u64, Vec<SuiIndexerNotification>>> =
BTreeMap::new();

while let Some((checkpoint_seq_number, notifications)) = receiver_lock.recv().await {
// Check if we have not received any message yet
if next_index == u64::MAX {
next_index = checkpoint_seq_number
}
// Check if correct order
if checkpoint_seq_number == next_index {
// Send message
info!(
"Sending: {} notifications with seq number {}",
notifications.len(),
next_index
);

// Iter over notifications and ordered by sequence number send them
for (_, notifications) in notifications.iter() {
odin.publish_sui_notifications(&notifications).await;
}

// Update next index
next_index = next_index + 1;
// Check if we have any cached messages
while let Some(next_checkpoint) = bmap_checkpoints.remove(&next_index) {
info!(
"Sending: {} cached notifications with seq number {}",
next_checkpoint.len(),
next_index
);
// Iter over notifications and ordered by sequence number send them
for (_, notifications) in notifications.iter() {
odin.publish_sui_notifications(&notifications).await;
}

// Update next index
next_index = next_index + 1;
}
} else {
info!(
"Received checkpoint with seq number {} but expected {}",
checkpoint_seq_number, next_index
);
// Cache message
bmap_checkpoints
.entry(checkpoint_seq_number)
.or_insert(BTreeMap::new())
.extend(notifications);
}
}
});
}
}

0 comments on commit 84d5e65

Please sign in to comment.