diff --git a/Cargo.lock b/Cargo.lock index a54abdb97f..82610ad25b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -778,6 +778,50 @@ dependencies = [ "serde", ] +[[package]] +name = "benchmark-data-update" +version = "0.1.0" +dependencies = [ + "anchor-client", + "anchor-lang", + "anyhow", + "async-channel", + "async-trait", + "base64 0.21.4", + "bs58 0.3.1", + "bytemuck", + "chrono", + "csv", + "fixed 1.11.0 (git+https://github.com/blockworks-foundation/fixed.git?branch=v1.11.0-borsh0_10-mango)", + "futures 0.3.28", + "futures-channel", + "futures-core", + "futures-util", + "hdrhistogram", + "itertools", + "jemallocator", + "log 0.4.20", + "mango-feeds-connector", + "mango-feeds-lib", + "mango-v4", + "mango-v4-client", + "native-tls", + "rustls 0.20.9", + "serde", + "serde_derive", + "serde_json", + "serum_dex 0.5.10 (git+https://github.com/openbook-dex/program.git)", + "services-mango-lib", + "solana-client", + "solana-logger", + "solana-sdk", + "tokio", + "tokio-tungstenite 0.17.2", + "toml", + "tracing", + "ws", +] + [[package]] name = "bincode" version = "1.3.3" @@ -1485,6 +1529,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.8.0" @@ -3326,9 +3391,8 @@ dependencies = [ [[package]] name = "mango-feeds-connector" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fcd440ee3dd5090a6f36bf8d9392ce7f9cc705828fdacf88b6022ddb7aeb895" +version = "0.2.2" +source = "git+https://github.com/blockworks-foundation/mango-feeds.git?branch=serge/custom_filtering#7165e7dfd3bbddd0c00a39f91e845bae1ce618ed" dependencies = [ "anyhow", "async-channel", @@ -3337,7 +3401,6 @@ dependencies = [ "itertools", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", - "log 0.4.20", "rustls 0.20.9", "serde", "serde_derive", @@ -3347,6 +3410,7 @@ dependencies = [ "solana-rpc", "solana-sdk", "tokio", + "tracing", "warp", "yellowstone-grpc-client", "yellowstone-grpc-proto", @@ -3540,10 +3604,12 @@ dependencies = [ "futures-core", "futures-util", "hdrhistogram", + "indexmap 2.0.0", "itertools", "jemallocator", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", + "mango-feeds-connector", "mango-v4", "mango-v4-client", "once_cell", @@ -3563,6 +3629,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tungstenite 0.16.1", + "toml", "tracing", ] @@ -8913,11 +8980,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "cfg-if 1.0.0", "log 0.4.20", "pin-project-lite", "tracing-attributes", @@ -8926,9 +8992,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2 1.0.67", "quote 1.0.33", @@ -8937,9 +9003,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", @@ -9800,9 +9866,8 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client" -version = "1.9.0+solana.1.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "638fc820f10c6d836732d43f7c8f93a85301a7d90705a0db38b3bc11fc6657f6" +version = "1.10.0+solana.1.16.14" +source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.8.0+solana.1.16.14#a6cb542167f5eb9ec59e4a074c1671fa132e2f00" dependencies = [ "bytes 1.5.0", "futures 0.3.28", @@ -9815,9 +9880,8 @@ dependencies = [ [[package]] name = "yellowstone-grpc-proto" -version = "1.9.0+solana.1.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22b29533f1a9486a84b2ffc1fc53d19607af3b71e0554dbde38a841b72026be6" +version = "1.9.0+solana.1.16.14" +source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.8.0+solana.1.16.14#a6cb542167f5eb9ec59e4a074c1671fa132e2f00" dependencies = [ "anyhow", "prost 0.11.9", diff --git a/Cargo.toml b/Cargo.toml index afb3f2adc6..c4e0448e0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ fixed = { git = "https://github.com/blockworks-foundation/fixed.git", branch = " pyth-sdk-solana = "0.8.0" # commit c85e56d (0.5.10 plus dependency updates) serum_dex = { git = "https://github.com/openbook-dex/program.git", default-features=false } -mango-feeds-connector = "0.2.1" +mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "serge/custom_filtering"} # 1.16.7+ is required due to this: https://github.com/blockworks-foundation/mango-v4/issues/712 solana-address-lookup-table-program = "~1.16.7" diff --git a/bin/benchmark-data-update/Cargo.toml b/bin/benchmark-data-update/Cargo.toml new file mode 100644 index 0000000000..185cd8f7c4 --- /dev/null +++ b/bin/benchmark-data-update/Cargo.toml @@ -0,0 +1,55 @@ +[package] +name = "benchmark-data-update" +version = "0.1.0" +authors = ["Serge Farny "] +edition = "2021" +license = "AGPL-3.0-or-later" + +[dependencies] +mango-feeds-connector = { workspace = true } +mango-feeds-lib = { path = "../../lib/mango-feeds-lib" } +services-mango-lib = { path = "../../lib/services-mango-lib" } + +solana-client = { workspace = true } +solana-logger = { workspace = true } +solana-sdk = { workspace = true } + +anchor-lang = { workspace = true } +anchor-client = { workspace = true } + +fixed = { workspace = true, features = ["serde", "borsh"] } + +mango-v4 = { path = "../../programs/mango-v4", features = ["client"] } +mango-v4-client = { path = "../../lib/client" } + +serum_dex = { workspace = true } + +bs58 = "0.3.1" +log = "0.4" +anyhow = "1.0" +toml = "0.5" +serde = "1.0.130" +serde_derive = "1.0.130" +serde_json = "1.0.68" +futures = "0.3.17" +futures-core = "0.3" +futures-channel = "0.3" +futures-util = "0.3" +ws = "^0.9.2" +async-channel = "1.6" +async-trait = "0.1" +bytemuck = "^1.7.2" +itertools = "0.10.3" +jemallocator = "0.3.2" +chrono = "0.4.23" +base64 = "0.21" + +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = "0.17" + +native-tls = "0.2" +rustls = "0.20.8" +tracing = { version = "0.1", features = ["log"] } + +hdrhistogram = "7.5.4" +csv = "1.0" diff --git a/bin/benchmark-data-update/README.md b/bin/benchmark-data-update/README.md new file mode 100644 index 0000000000..495ff331d0 --- /dev/null +++ b/bin/benchmark-data-update/README.md @@ -0,0 +1,4 @@ +# benchmark data update + +Compare websocket and grpc connection performance + diff --git a/bin/benchmark-data-update/conf/example-config.toml b/bin/benchmark-data-update/conf/example-config.toml new file mode 100644 index 0000000000..b5172ed7b4 --- /dev/null +++ b/bin/benchmark-data-update/conf/example-config.toml @@ -0,0 +1,14 @@ +mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" + +[source_configuration] +rpc_ws_url = "wss://mango.rpcpool.com/" +rpc_http_url = "http://mango.rpcpool.com/" +snapshot_interval_secs = 900 +use_grpc = false +dedup_queue_size = 50000 + +[[source_configuration.grpc_sources]] +name = "benchmark-data-update" +connection_string = "http://tyo64.rpcpool.com/" +#token = "" +retry_connection_sleep_secs = 30 diff --git a/bin/benchmark-data-update/src/configuration.rs b/bin/benchmark-data-update/src/configuration.rs new file mode 100644 index 0000000000..77a8b386e2 --- /dev/null +++ b/bin/benchmark-data-update/src/configuration.rs @@ -0,0 +1,25 @@ +use mango_feeds_connector::GrpcSourceConfig; +use serde_derive::Deserialize; +use services_mango_lib::env_helper::string_or_env; + +#[derive(Clone, Debug, Deserialize)] +pub struct Configuration { + #[serde(deserialize_with = "string_or_env")] + pub mango_group: String, + #[serde(deserialize_with = "string_or_env")] + pub export_csv_path: String, + pub source_configuration: SourceConfiguration, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct SourceConfiguration { + #[serde(deserialize_with = "string_or_env")] + pub rpc_http_url: String, + #[serde(deserialize_with = "string_or_env")] + pub rpc_ws_url: String, + + pub snapshot_interval_secs: u64, + + pub dedup_queue_size: usize, + pub grpc_sources: Vec, +} diff --git a/bin/benchmark-data-update/src/main.rs b/bin/benchmark-data-update/src/main.rs new file mode 100644 index 0000000000..15adbb338b --- /dev/null +++ b/bin/benchmark-data-update/src/main.rs @@ -0,0 +1,82 @@ +mod configuration; +mod processors; + +use futures_util::StreamExt; +// use mango_feeds_connector::metrics; +use mango_v4_client::tracing_subscriber_init; +use std::fs::File; +use std::io::Read; +use std::sync::atomic::Ordering; + +use crate::configuration::Configuration; +use crate::processors::data::{DataEventSource, DataProcessor}; +use crate::processors::exit::ExitProcessor; +use crate::processors::exporter::ExporterProcessor; +use crate::processors::logger::LoggerProcessor; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args: Vec = std::env::args().collect(); + + if args.len() < 2 { + eprintln!("Please enter a config file path argument."); + return Ok(()); + } + + let configuration: Configuration = { + let mut file = File::open(&args[1])?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + toml::from_str(&contents).unwrap() + }; + + tracing_subscriber_init(); + + let exit_processor = ExitProcessor::init().await?; + + let ws_processor: DataProcessor = DataProcessor::init( + &configuration, + DataEventSource::Websocket, + exit_processor.exit.clone(), + ) + .await?; + let grpc_processor: DataProcessor = DataProcessor::init( + &configuration, + DataEventSource::Grpc, + exit_processor.exit.clone(), + ) + .await?; + + let logger_processor = LoggerProcessor::init( + &ws_processor.channel, + &grpc_processor.channel, + exit_processor.exit.clone(), + ) + .await?; + + let exporter_processor = ExporterProcessor::init( + &configuration, + &ws_processor.channel, + &grpc_processor.channel, + exit_processor.exit.clone(), + ) + .await?; + + let jobs = vec![ + exit_processor.job, + ws_processor.job, + grpc_processor.job, + logger_processor.job, + exporter_processor.job, + ]; + let mut jobs: futures::stream::FuturesUnordered<_> = jobs.into_iter().collect(); + + while let Some(_) = jobs.next().await { + // if any job exit, stop the others threads & wait + exit_processor.exit.store(true, Ordering::Relaxed); + } + + // for now, we force exit here because websocket connection to RPC is not properly closed on exit + tracing::warn!("killing process"); + std::process::exit(0x0100); +} diff --git a/bin/benchmark-data-update/src/processors/data.rs b/bin/benchmark-data-update/src/processors/data.rs new file mode 100644 index 0000000000..76f5e38f37 --- /dev/null +++ b/bin/benchmark-data-update/src/processors/data.rs @@ -0,0 +1,229 @@ +use crate::configuration::Configuration; +use crate::processors::data::DataEvent::{AccountUpdate, Other, Snapshot}; +use async_channel::Receiver; +use chrono::Utc; +use itertools::Itertools; +use mango_v4_client::account_update_stream::Message; +use mango_v4_client::{account_update_stream, grpc_source, websocket_source, MangoGroupContext}; +use services_mango_lib::fail_or_retry; +use services_mango_lib::retry_counter::RetryCounter; +use solana_client::nonblocking::rpc_client::RpcClient as RpcClientAsync; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::pubkey::Pubkey; +use std::fmt::Display; +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::task::JoinHandle; +use tracing::warn; + +pub struct DataProcessor { + pub channel: tokio::sync::broadcast::Sender, + pub job: JoinHandle<()>, +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum DataEventSource { + Websocket, + Grpc, +} + +impl Display for DataEventSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +#[derive(Clone, Debug)] +pub enum DataEvent { + Other, + Snapshot(SnapshotEvent), + AccountUpdate(AccountUpdateEvent), +} + +#[derive(Clone, Debug)] +pub struct SnapshotEvent { + pub received_at: chrono::DateTime, + pub accounts: Vec, + pub source: DataEventSource, + pub slot: u64, +} + +#[derive(Clone, Debug)] +pub struct AccountUpdateEvent { + pub received_at: chrono::DateTime, + pub account: Pubkey, + pub source: DataEventSource, + pub slot: u64, +} + +impl DataProcessor { + pub async fn init( + configuration: &Configuration, + source: DataEventSource, + exit: Arc, + ) -> anyhow::Result { + let mut retry_counter = RetryCounter::new(2); + let mango_stream = fail_or_retry!( + retry_counter, + Self::init_mango_source(configuration, source, exit.clone()).await + )?; + let (sender, _) = tokio::sync::broadcast::channel(8192); + let sender_clone = sender.clone(); + + let job = tokio::spawn(async move { + loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down data processor..."); + break; + } + tokio::select! { + Ok(msg) = mango_stream.recv() => { + let received_at = Utc::now(); + if sender_clone.receiver_count() == 0 { + continue; + } + + let event = Self::parse_message(msg, source, received_at); + + if event.is_none() { + continue; + } + + let res = sender_clone.send(event.unwrap()); + if res.is_err() { + break; + } + }, + else => { + warn!("mango update channel err"); + break; + } + } + } + }); + + let result = DataProcessor { + channel: sender, + job, + }; + + Ok(result) + } + + fn new_rpc_async(configuration: &Configuration) -> RpcClientAsync { + let commitment = CommitmentConfig::processed(); + RpcClientAsync::new_with_timeout_and_commitment( + configuration.source_configuration.rpc_http_url.clone(), + Duration::from_secs(60), + commitment, + ) + } + + fn parse_message( + message: Message, + source: DataEventSource, + received_at: chrono::DateTime, + ) -> Option { + match message { + Message::Account(account_write) => { + return Some(AccountUpdate(AccountUpdateEvent { + account: account_write.pubkey, + received_at, + source, + slot: account_write.slot, + })); + } + Message::Snapshot(snapshot, _) => { + let slot = snapshot[0].slot; + let mut result = Vec::new(); + for update in snapshot.iter() { + result.push(update.pubkey); + assert!(slot == update.slot); + } + + return Some(Snapshot(SnapshotEvent { + accounts: result, + received_at, + source: source, + slot: slot, + })); + } + _ => {} + }; + + return Some(Other); + } + + async fn init_mango_source( + configuration: &Configuration, + source: DataEventSource, + exit: Arc, + ) -> anyhow::Result> { + // + // Client setup + // + let rpc_async = Self::new_rpc_async(configuration); + + let mango_group = Pubkey::from_str(&configuration.mango_group)?; + let group_context = MangoGroupContext::new_from_rpc(&rpc_async, mango_group).await?; + + let mango_oracles = group_context + .tokens + .values() + .map(|value| value.oracle) + .chain(group_context.perp_markets.values().map(|p| p.oracle)) + .unique() + .collect::>(); + + let serum_programs = group_context + .serum3_markets + .values() + .map(|s3| s3.serum_program) + .unique() + .collect_vec(); + + let (account_update_sender, account_update_receiver) = + async_channel::unbounded::(); + + if source == DataEventSource::Grpc { + let metrics_config = mango_feeds_connector::MetricsConfig { + output_stdout: false, + output_http: false, + }; + let metrics = mango_feeds_connector::metrics::start( + metrics_config, + "benchmark-data-update".to_string(), + ); + let sources = configuration.source_configuration.grpc_sources.clone(); + + grpc_source::start( + grpc_source::Config { + rpc_http_url: configuration.source_configuration.rpc_http_url.clone(), + rpc_ws_url: configuration.source_configuration.rpc_ws_url.clone(), + serum_programs, + open_orders_authority: mango_group, + grpc_sources: sources, + }, + mango_oracles, + account_update_sender, + metrics, + exit, + ); + } else { + websocket_source::start( + websocket_source::Config { + rpc_http_url: configuration.source_configuration.rpc_http_url.clone(), + rpc_ws_url: configuration.source_configuration.rpc_ws_url.clone(), + serum_programs, + open_orders_authority: mango_group, + }, + mango_oracles, + account_update_sender, + ); + } + + Ok(account_update_receiver) + } +} diff --git a/bin/benchmark-data-update/src/processors/exit.rs b/bin/benchmark-data-update/src/processors/exit.rs new file mode 100644 index 0000000000..8ef8d0ef49 --- /dev/null +++ b/bin/benchmark-data-update/src/processors/exit.rs @@ -0,0 +1,39 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tracing::{info, warn}; + +pub struct ExitProcessor { + pub job: JoinHandle<()>, + pub exit: Arc, +} + +impl ExitProcessor { + pub async fn init() -> anyhow::Result { + let exit: Arc = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + + let job = tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000)); + loop { + if exit_clone.load(Ordering::Relaxed) { + warn!("shutting down logger processor..."); + break; + } + tokio::select! { + _ = interval.tick() => {} + _ = tokio::signal::ctrl_c()=> { + info!("Received SIGINT, shutting down..."); + exit_clone.store(true, Ordering::Relaxed); + break; + } + } + } + + warn!("shutting down exit processor..."); + }); + + let result = ExitProcessor { job, exit }; + Ok(result) + } +} diff --git a/bin/benchmark-data-update/src/processors/exporter.rs b/bin/benchmark-data-update/src/processors/exporter.rs new file mode 100644 index 0000000000..7f12088e7d --- /dev/null +++ b/bin/benchmark-data-update/src/processors/exporter.rs @@ -0,0 +1,100 @@ +use csv::Writer; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tracing::warn; + +use crate::configuration::Configuration; + +use super::data::{AccountUpdateEvent, DataEvent, DataEventSource}; + +pub struct ExporterProcessor { + pub job: JoinHandle<()>, +} + +impl ExporterProcessor { + pub async fn init( + configuration: &Configuration, + data_sender_1: &tokio::sync::broadcast::Sender, + data_sender_2: &tokio::sync::broadcast::Sender, + exit: Arc, + ) -> anyhow::Result { + let export_csv_path = configuration.export_csv_path.clone(); + let mut data_1 = data_sender_1.subscribe(); + let mut data_2: tokio::sync::broadcast::Receiver = data_sender_2.subscribe(); + + let job = tokio::spawn(async move { + let mut wtr = Writer::from_path(export_csv_path).expect("could not create csv file"); + let mut interval = tokio::time::interval(std::time::Duration::from_millis(5 * 1000)); + + wtr.write_record(&["slot", "time", "source", "account", "snap"]) + .expect("failed to write header"); + + loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down logger processor..."); + break; + } + + tokio::select! { + _ = interval.tick() => { + wtr.flush().expect("flushing csv file failed"); + }, + Ok(msg) = data_1.recv() => Self::handle(msg, &mut wtr), + Ok(msg) = data_2.recv() => Self::handle(msg, &mut wtr), + } + } + }); + + let result = ExporterProcessor { job }; + + Ok(result) + } + + fn handle_account( + upd: AccountUpdateEvent, + writer: &mut Writer, + is_snapshot: bool, + ) { + let source = match upd.source { + DataEventSource::Websocket => "ws".to_string(), + DataEventSource::Grpc => "grpc".to_string(), + }; + let snap = match is_snapshot { + true => "snapshot".to_string(), + false => "single".to_string(), + }; + writer + .write_record(&[ + upd.slot.to_string(), + upd.received_at.to_string(), + source, + upd.account.to_string(), + snap, + ]) + .expect("failed to write account update"); + } + + fn handle(msg: DataEvent, writer: &mut Writer) { + match msg { + DataEvent::Other => {} + DataEvent::Snapshot(upd) => { + for acc in upd.accounts { + Self::handle_account( + AccountUpdateEvent { + received_at: upd.received_at, + account: acc, + source: upd.source, + slot: upd.slot, + }, + writer, + true, + ); + } + } + DataEvent::AccountUpdate(upd) => { + Self::handle_account(upd, writer, false); + } + } + } +} diff --git a/bin/benchmark-data-update/src/processors/logger.rs b/bin/benchmark-data-update/src/processors/logger.rs new file mode 100644 index 0000000000..857e7486fd --- /dev/null +++ b/bin/benchmark-data-update/src/processors/logger.rs @@ -0,0 +1,174 @@ +use hdrhistogram::Histogram; +use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::task::JoinHandle; +use tracing::{info, warn}; + +use super::data::{AccountUpdateEvent, DataEvent, DataEventSource}; + +pub struct LoggerProcessor { + pub job: JoinHandle<()>, +} + +impl LoggerProcessor { + pub async fn init( + data_sender_1: &tokio::sync::broadcast::Sender, + data_sender_2: &tokio::sync::broadcast::Sender, + exit: Arc, + ) -> anyhow::Result { + let mut first = true; + let mut got_1 = false; + let mut got_2 = false; + let mut data_1 = data_sender_1.subscribe(); + let mut data_2: tokio::sync::broadcast::Receiver = data_sender_2.subscribe(); + + let job = tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(5 * 1000)); + let mut events = HashMap::::new(); + let mut grpc_late = Histogram::::new(3).unwrap(); + let mut ws_late = Histogram::::new(3).unwrap(); + + loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down logger processor..."); + break; + } + + tokio::select! { + _ = interval.tick() => { + if !first { + Self::print(&mut events, &mut ws_late, &mut grpc_late); + continue; + } + + ws_late.clear(); + grpc_late.clear(); + events.clear(); + first = !got_1 && !got_2; + }, + Ok(msg) = data_1.recv() => { got_1 |= Self::handle(msg, &mut events, &mut ws_late, &mut grpc_late) }, + Ok(msg) = data_2.recv() => { got_2 |= Self::handle(msg, &mut events, &mut ws_late, &mut grpc_late) }, + } + } + }); + + let result = LoggerProcessor { job }; + + Ok(result) + } + + fn handle_account( + upd: AccountUpdateEvent, + pending_events: &mut HashMap, + ws_late: &mut Histogram, + grpc_late: &mut Histogram, + is_snapshot: bool, + ) { + let key = upd.account; + if let Some(existing) = pending_events.get(&key) { + if existing.slot > upd.slot { + // still lagging + return; + } + + let delay = (upd.received_at - existing.received_at) + .num_nanoseconds() + .unwrap(); + match existing.source { + DataEventSource::Websocket => grpc_late.record(delay as u64).unwrap(), + DataEventSource::Grpc => ws_late.record(delay as u64).unwrap(), + } + + if is_snapshot { + // only match existing, + // but don't expect matching from the other source as there is probably nothing updated for the account + pending_events.remove(&key); + return; + } + + if upd.slot == existing.slot { + pending_events.remove(&key); + } else { + pending_events.insert(key, upd); + } + } else { + if is_snapshot { + return; // ignore + } + + pending_events.insert(key, upd); + } + } + + fn print( + events: &mut HashMap, + ws_late: &mut Histogram, + grpc_late: &mut Histogram, + ) { + let ws_late = format!( + "{:?}", + Duration::from_nanos(ws_late.value_at_quantile(0.99)) + ); + let grpc_late = format!( + "{:?}", + Duration::from_nanos(grpc_late.value_at_quantile(0.99)) + ); + let pending_ws_events_count = events + .iter() + .filter(|f| f.1.source == DataEventSource::Grpc) + .count(); + let pending_grpc_events_count = events.len() - pending_ws_events_count; + + for x in events { + tracing::debug!( + "{} => {} {} (got from {})", + x.0, + x.1.slot, + x.1.received_at, + x.1.source + ) + } + + info!( + ws_lateness = %ws_late, + grpc_lateness = %grpc_late, + pending_ws_events_count = %pending_ws_events_count, + pending_grpc_events_count = %pending_grpc_events_count, + ) + } + + fn handle( + msg: DataEvent, + events: &mut HashMap, + ws_late: &mut Histogram, + grpc_late: &mut Histogram, + ) -> bool { + match msg { + DataEvent::Other => false, + DataEvent::Snapshot(upd) => { + for acc in upd.accounts { + Self::handle_account( + AccountUpdateEvent { + received_at: upd.received_at, + account: acc, + source: upd.source, + slot: upd.slot, + }, + events, + ws_late, + grpc_late, + true, + ); + } + false + } + DataEvent::AccountUpdate(upd) => { + Self::handle_account(upd, events, ws_late, grpc_late, false); + true + } + } + } +} diff --git a/bin/benchmark-data-update/src/processors/mod.rs b/bin/benchmark-data-update/src/processors/mod.rs new file mode 100644 index 0000000000..ab219f43bd --- /dev/null +++ b/bin/benchmark-data-update/src/processors/mod.rs @@ -0,0 +1,4 @@ +pub mod data; +pub mod exit; +pub mod exporter; +pub mod logger; diff --git a/bin/cli/src/save_snapshot.rs b/bin/cli/src/save_snapshot.rs index 575b900bbc..c20ab0c3d8 100644 --- a/bin/cli/src/save_snapshot.rs +++ b/bin/cli/src/save_snapshot.rs @@ -52,6 +52,7 @@ pub async fn save_snapshot( // Sourcing account and slot data from solana via websockets websocket_source::start( websocket_source::Config { + rpc_http_url: rpc_url.clone(), rpc_ws_url: ws_url.clone(), serum_programs, open_orders_authority: mango_group, @@ -67,7 +68,7 @@ pub async fn save_snapshot( .await?; // Getting solana account snapshots via jsonrpc - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: rpc_url.clone(), mango_group, @@ -79,6 +80,11 @@ pub async fn save_snapshot( extra_accounts, account_update_sender, ); + tokio::spawn(async move { + let res = snapshot_job.await; + tracing::error!("Snapshot job exited, terminating process.. ({:?})", res); + std::process::exit(-1); + }); let mut chain_data = chain_data::ChainData::new(); @@ -93,7 +99,7 @@ pub async fn save_snapshot( match message { Message::Account(_) => {} - Message::Snapshot(snapshot) => { + Message::Snapshot(snapshot, _) => { for slot in snapshot.iter().map(|a| a.slot).unique() { chain_data.update_slot(chain_data::SlotData { slot, diff --git a/bin/liquidator/Cargo.toml b/bin/liquidator/Cargo.toml index d591bd37b2..dfe2947420 100644 --- a/bin/liquidator/Cargo.toml +++ b/bin/liquidator/Cargo.toml @@ -31,6 +31,7 @@ jsonrpc-core = "18.0.0" jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http", "tls"] } mango-v4 = { path = "../../programs/mango-v4", features = ["client"] } mango-v4-client = { path = "../../lib/client" } +mango-feeds-connector = { workspace = true } once_cell = "1.12.0" pyth-sdk-solana = { workspace = true } rand = "0.7" @@ -49,4 +50,6 @@ tokio-stream = { version = "0.1.9"} tokio-tungstenite = "0.16.1" tracing = "0.1" regex = "1.9.5" -hdrhistogram = "7.5.4" \ No newline at end of file +hdrhistogram = "7.5.4" +indexmap = "2.0.0" +toml = "0.5" diff --git a/bin/liquidator/conf/example-config.toml b/bin/liquidator/conf/example-config.toml new file mode 100644 index 0000000000..a3326fa2f5 --- /dev/null +++ b/bin/liquidator/conf/example-config.toml @@ -0,0 +1,5 @@ +[[grpc_sources]] +name = "liquidator" +connection_string = "http://tyo64.rpcpool.com/" +retry_connection_sleep_secs = 30 + diff --git a/bin/liquidator/src/cli_args.rs b/bin/liquidator/src/cli_args.rs index 53ea01fad8..29f617fe21 100644 --- a/bin/liquidator/src/cli_args.rs +++ b/bin/liquidator/src/cli_args.rs @@ -136,6 +136,12 @@ pub struct Cli { #[clap(long, env, value_enum, default_value = "true")] pub(crate) take_tcs: BoolArg, + #[clap(long, env, default_value = "30")] + pub(crate) tcs_refresh_timeout_secs: u64, + + #[clap(long, env, default_value = "1000")] + pub(crate) tcs_check_interval_ms: u64, + /// profit margin at which to take tcs orders #[clap(long, env, default_value = "0.0005")] pub(crate) tcs_profit_fraction: f64, @@ -178,6 +184,10 @@ pub struct Cli { #[clap(long, env, default_value = "https://quote-api.jup.ag/v6")] pub(crate) jupiter_v6_url: String, + /// override the jupiter http request timeout + #[clap(long, env, default_value = "30")] + pub(crate) jupiter_timeout_secs: u64, + /// provide a jupiter token, currently only for jup v6 #[clap(long, env, default_value = "")] pub(crate) jupiter_token: String, @@ -191,6 +201,12 @@ pub struct Cli { #[clap(long, env, value_enum, default_value = "true")] pub(crate) telemetry: BoolArg, + /// if liquidation is enabled + /// + /// might be used to run an instance of liquidator dedicated to TCS and another one for liquidation + #[clap(long, env, value_enum, default_value = "true")] + pub(crate) liquidation_enabled: BoolArg, + /// liquidation refresh timeout in secs #[clap(long, env, default_value = "30")] pub(crate) liquidation_refresh_timeout_secs: u8, @@ -216,4 +232,12 @@ pub struct Cli { /// how long should it wait before logging an oracle error again (for the same token) #[clap(long, env, default_value = "30")] pub(crate) skip_oracle_error_in_logs_duration_secs: u64, + + /// max number of liquidation/tcs to do concurrently + #[clap(long, env, default_value = "5")] + pub(crate) max_parallel_operations: u64, + + /// use geyser instead of websocket - additional configuration stored as a toml file (hard to represent as a command line) + #[clap(long, env)] + pub(crate) geyser_config: Option, } diff --git a/bin/liquidator/src/configuration.rs b/bin/liquidator/src/configuration.rs new file mode 100644 index 0000000000..1eb48320ec --- /dev/null +++ b/bin/liquidator/src/configuration.rs @@ -0,0 +1,7 @@ +use mango_feeds_connector::GrpcSourceConfig; +use serde::Deserialize; + +#[derive(Clone, Debug, Deserialize, Default)] +pub struct Configuration { + pub grpc_sources: Vec, +} diff --git a/bin/liquidator/src/liquidate.rs b/bin/liquidator/src/liquidate.rs index c355aaa19f..06c3d1f018 100644 --- a/bin/liquidator/src/liquidate.rs +++ b/bin/liquidator/src/liquidate.rs @@ -9,6 +9,7 @@ use mango_v4_client::{chain_data, MangoClient, PreparedInstructions}; use solana_sdk::signature::Signature; use futures::{stream, StreamExt, TryStreamExt}; +use mango_v4::accounts_ix::HealthCheckKind::MaintRatio; use rand::seq::SliceRandom; use tracing::*; use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; @@ -260,7 +261,22 @@ impl<'a> LiquidateHelper<'a> { ) .await .context("creating perp_liq_base_or_positive_pnl_instruction")?; + liq_ixs.cu = liq_ixs.cu.max(self.config.compute_limit_for_liq_ix); + + let liqor = &self.client.mango_account().await?; + liq_ixs.append( + self.client + .health_check_instruction( + liqor, + self.config.min_health_ratio, + vec![], + vec![*perp_market_index], + MaintRatio, + ) + .await?, + ); + let txsig = self .client .send_and_confirm_owner_tx(liq_ixs.to_instructions()) @@ -501,6 +517,20 @@ impl<'a> LiquidateHelper<'a> { .await .context("creating liq_token_with_token ix")?; liq_ixs.cu = liq_ixs.cu.max(self.config.compute_limit_for_liq_ix); + + let liqor = self.client.mango_account().await?; + liq_ixs.append( + self.client + .health_check_instruction( + &liqor, + self.config.min_health_ratio, + vec![asset_token_index, liab_token_index], + vec![], + MaintRatio, + ) + .await?, + ); + let txsig = self .client .send_and_confirm_owner_tx(liq_ixs.to_instructions()) @@ -651,14 +681,11 @@ impl<'a> LiquidateHelper<'a> { } #[allow(clippy::too_many_arguments)] -pub async fn maybe_liquidate_account( +pub async fn can_liquidate_account( mango_client: &MangoClient, account_fetcher: &chain_data::AccountFetcher, pubkey: &Pubkey, - config: &Config, ) -> anyhow::Result { - let liqor_min_health_ratio = I80F48::from_num(config.min_health_ratio); - let account = account_fetcher.fetch_mango_account(pubkey)?; let health_cache = mango_client .health_cache(&account) @@ -675,6 +702,18 @@ pub async fn maybe_liquidate_account( "possible candidate", ); + Ok(true) +} + +#[allow(clippy::too_many_arguments)] +pub async fn maybe_liquidate_account( + mango_client: &MangoClient, + account_fetcher: &chain_data::AccountFetcher, + pubkey: &Pubkey, + config: &Config, +) -> anyhow::Result { + let liqor_min_health_ratio = I80F48::from_num(config.min_health_ratio); + // Fetch a fresh account and re-compute // This is -- unfortunately -- needed because the websocket streams seem to not // be great at providing timely updates to the account data. diff --git a/bin/liquidator/src/liquidation_state.rs b/bin/liquidator/src/liquidation_state.rs new file mode 100644 index 0000000000..aedae78908 --- /dev/null +++ b/bin/liquidator/src/liquidation_state.rs @@ -0,0 +1,238 @@ +use crate::cli_args::Cli; +use crate::metrics::Metrics; +use crate::unwrappable_oracle_error::UnwrappableOracleError; +use crate::{liquidate, LiqErrorType, SharedState}; +use anchor_lang::prelude::Pubkey; +use itertools::Itertools; +use mango_v4::state::TokenIndex; +use mango_v4_client::error_tracking::ErrorTracking; +use mango_v4_client::{chain_data, MangoClient, MangoClientError}; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; +use tokio::task::JoinHandle; +use tracing::{error, trace, warn}; + +#[derive(Clone)] +pub struct LiquidationState { + pub mango_client: Arc, + pub account_fetcher: Arc, + pub liquidation_config: liquidate::Config, + + pub errors: Arc>>, + pub oracle_errors: Arc>>, +} + +impl LiquidationState { + async fn find_candidates( + &mut self, + accounts_iter: impl Iterator, + action: impl Fn(Pubkey) -> anyhow::Result<()>, + ) -> anyhow::Result { + let mut found_counter = 0u64; + use rand::seq::SliceRandom; + + let mut accounts = accounts_iter.collect::>(); + { + let mut rng = rand::thread_rng(); + accounts.shuffle(&mut rng); + } + + for pubkey in accounts { + if self.should_skip_execution(pubkey) { + continue; + } + + let result = + liquidate::can_liquidate_account(&self.mango_client, &self.account_fetcher, pubkey) + .await; + + self.log_or_ignore_error(&result, pubkey); + + if result.unwrap_or(false) { + action(*pubkey)?; + found_counter = found_counter + 1; + } + } + + Ok(found_counter) + } + + fn should_skip_execution(&mut self, pubkey: &Pubkey) -> bool { + let now = Instant::now(); + let error_tracking = &mut self.errors; + + // Skip a pubkey if there've been too many errors recently + if let Some(error_entry) = + error_tracking + .read() + .unwrap() + .had_too_many_errors(LiqErrorType::Liq, pubkey, now) + { + trace!( + %pubkey, + error_entry.count, + "skip checking account for liquidation, had errors recently", + ); + return true; + } + + false + } + + fn log_or_ignore_error(&mut self, result: &anyhow::Result, pubkey: &Pubkey) { + let error_tracking = &mut self.errors; + + if let Err(err) = result.as_ref() { + if let Some((ti, ti_name)) = err.try_unwrap_oracle_error() { + if self + .oracle_errors + .read() + .unwrap() + .had_too_many_errors(LiqErrorType::Liq, &ti, Instant::now()) + .is_none() + { + warn!( + "{:?} recording oracle error for token {} {}", + chrono::offset::Utc::now(), + ti_name, + ti + ); + } + + self.oracle_errors + .write() + .unwrap() + .record(LiqErrorType::Liq, &ti, err.to_string()); + return; + } + + // Keep track of pubkeys that had errors + error_tracking + .write() + .unwrap() + .record(LiqErrorType::Liq, pubkey, err.to_string()); + + // Not all errors need to be raised to the user's attention. + let mut is_error = true; + + // Simulation errors due to liqee precondition failures on the liquidation instructions + // will commonly happen if our liquidator is late or if there are chain forks. + match err.downcast_ref::() { + Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => { + if logs.iter().any(|line| { + line.contains("HealthMustBeNegative") || line.contains("IsNotBankrupt") + }) { + is_error = false; + } + } + _ => {} + }; + if is_error { + error!("liquidating account {}: {:?}", pubkey, err); + } else { + trace!("liquidating account {}: {:?}", pubkey, err); + } + } else { + error_tracking + .write() + .unwrap() + .clear(LiqErrorType::Liq, pubkey); + } + } + + pub async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result { + if self.should_skip_execution(pubkey) { + return Ok(false); + } + + let result = liquidate::maybe_liquidate_account( + &self.mango_client, + &self.account_fetcher, + pubkey, + &self.liquidation_config, + ) + .await; + + self.log_or_ignore_error(&result, pubkey); + return result; + } +} + +pub fn spawn_liquidation_job( + cli: &Cli, + shared_state: &Arc>, + tx_trigger_sender: async_channel::Sender<()>, + mut liquidation: Box, + metrics: &Metrics, +) -> JoinHandle<()> { + tokio::spawn({ + let mut interval = + mango_v4_client::delay_interval(Duration::from_millis(cli.check_interval_ms)); + let mut metric_liquidation_check = metrics.register_latency("liquidation_check".into()); + let mut metric_liquidation_start_end = + metrics.register_latency("liquidation_start_end".into()); + + let mut liquidation_start_time = None; + + let shared_state = shared_state.clone(); + async move { + loop { + interval.tick().await; + + let account_addresses = { + let mut state = shared_state.write().unwrap(); + if !state.one_snapshot_done { + // discard first latency info as it will skew data too much + state.oldest_chain_event_reception_time = None; + continue; + } + if state.oldest_chain_event_reception_time.is_none() + && liquidation_start_time.is_none() + { + // no new update, skip computing + continue; + } + + state.mango_accounts.iter().cloned().collect_vec() + }; + + liquidation.errors.write().unwrap().update(); + liquidation.oracle_errors.write().unwrap().update(); + + if liquidation_start_time.is_none() { + liquidation_start_time = Some(Instant::now()); + } + + let found_candidates = liquidation + .find_candidates(account_addresses.iter(), |p| { + if shared_state + .write() + .unwrap() + .liquidation_candidates_accounts + .insert(p) + { + tx_trigger_sender.try_send(())?; + } + + Ok(()) + }) + .await + .unwrap(); + + if found_candidates > 0 { + tracing::debug!("found {} candidates for liquidation", found_candidates); + } + + let mut state = shared_state.write().unwrap(); + let reception_time = state.oldest_chain_event_reception_time.unwrap(); + let current_time = Instant::now(); + + state.oldest_chain_event_reception_time = None; + + metric_liquidation_check.push(current_time - reception_time); + metric_liquidation_start_end.push(current_time - liquidation_start_time.unwrap()); + liquidation_start_time = None; + } + } + }) +} diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index 758736936b..d2a7797cdb 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -1,36 +1,50 @@ use std::collections::HashMap; use std::collections::HashSet; +use std::fs::File; +use std::io::Read; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use anchor_client::Cluster; -use anyhow::Context; use clap::Parser; +use futures_util::StreamExt; +use mango_feeds_connector::MetricsConfig; use mango_v4::state::{PerpMarketIndex, TokenIndex}; -use mango_v4_client::AsyncChannelSendUnlessFull; +use mango_v4_client::account_update_stream::SnapshotType; use mango_v4_client::{ - account_update_stream, chain_data, error_tracking::ErrorTracking, keypair_from_cli, - snapshot_source, websocket_source, Client, MangoClient, MangoClientError, MangoGroupContext, + account_update_stream, chain_data, error_tracking::ErrorTracking, grpc_source, + keypair_from_cli, snapshot_source, websocket_source, Client, MangoClient, MangoGroupContext, TransactionBuilderConfig, }; +use crate::cli_args::{BoolArg, Cli, CliDotenv}; +use crate::configuration::Configuration; +use crate::liquidation_state::LiquidationState; +use crate::rebalance::Rebalancer; +use crate::tcs_state::TcsState; +use crate::token_swap_info::TokenSwapInfoUpdater; use itertools::Itertools; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; use solana_sdk::signer::Signer; +use tokio::task::JoinHandle; use tracing::*; pub mod cli_args; +mod configuration; pub mod liquidate; +mod liquidation_state; pub mod metrics; pub mod rebalance; +mod tcs_state; pub mod telemetry; pub mod token_swap_info; pub mod trigger_tcs; +mod tx_sender; mod unwrappable_oracle_error; pub mod util; -use crate::unwrappable_oracle_error::UnwrappableOracleError; use crate::util::{is_mango_account, is_mint_info, is_perp_market}; // jemalloc seems to be better at keeping the memory footprint reasonable over @@ -44,6 +58,7 @@ pub fn encode_address(addr: &Pubkey) -> String { #[tokio::main] async fn main() -> anyhow::Result<()> { + // env_logger::init(); mango_v4_client::tracing_subscriber_init(); let args: Vec = if let Ok(cli_dotenv) = CliDotenv::try_parse() { @@ -56,6 +71,7 @@ async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); std::env::args_os().collect() }; + let cli = Cli::parse_from(args); // @@ -69,7 +85,7 @@ async fn main() -> anyhow::Result<()> { // Client setup // let liqor_owner = Arc::new(keypair_from_cli(&cli.liqor_owner)); - let rpc_url = cli.rpc_url; + let rpc_url = cli.rpc_url.clone(); let ws_url = rpc_url.replace("https", "wss"); let rpc_timeout = Duration::from_secs(10); let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone()); @@ -79,8 +95,9 @@ async fn main() -> anyhow::Result<()> { .commitment(commitment) .fee_payer(Some(liqor_owner.clone())) .timeout(rpc_timeout) - .jupiter_v6_url(cli.jupiter_v6_url) - .jupiter_token(cli.jupiter_token) + .jupiter_timeout(Duration::from_secs(cli.jupiter_timeout_secs)) + .jupiter_v6_url(cli.jupiter_v6_url.clone()) + .jupiter_token(cli.jupiter_token.clone()) .transaction_builder_config( TransactionBuilderConfig::builder() .priority_fee_provider(prio_provider) @@ -89,7 +106,7 @@ async fn main() -> anyhow::Result<()> { .build() .unwrap(), ) - .override_send_transaction_urls(cli.override_send_transaction_url) + .override_send_transaction_urls(cli.override_send_transaction_url.clone()) .build() .unwrap(); @@ -141,17 +158,56 @@ async fn main() -> anyhow::Result<()> { let (account_update_sender, account_update_receiver) = async_channel::unbounded::(); - // Sourcing account and slot data from solana via websockets + // Sourcing account and slot data from solana via websockets/grpc // FUTURE: websocket feed should take which accounts to listen to as an input - websocket_source::start( - websocket_source::Config { - rpc_ws_url: ws_url.clone(), - serum_programs, - open_orders_authority: mango_group, - }, - mango_oracles.clone(), - account_update_sender.clone(), - ); + if cli.geyser_config.is_some() { + let configuration = cli + .geyser_config + .as_ref() + .map(|path| { + let mut file = File::open(path).expect("invalid config file path"); + let mut contents = String::new(); + file.read_to_string(&mut contents) + .expect("failed to read config file"); + let config: Configuration = + toml::from_str(&contents).expect("invalid config file content"); + config + }) + .unwrap_or_default(); + + let feed_metrics = mango_feeds_connector::metrics::start( + MetricsConfig { + output_http: false, + output_stdout: true, + }, + "liquidator".to_string(), + ); + let exit = Arc::new(AtomicBool::new(false)); + grpc_source::start( + grpc_source::Config { + rpc_ws_url: ws_url.clone(), + rpc_http_url: rpc_url.clone(), + serum_programs, + open_orders_authority: mango_group, + grpc_sources: configuration.grpc_sources, + }, + mango_oracles.clone(), + account_update_sender.clone(), + feed_metrics, + exit, + ); + } else { + websocket_source::start( + websocket_source::Config { + rpc_ws_url: ws_url.clone(), + rpc_http_url: rpc_url.clone(), + serum_programs, + open_orders_authority: mango_group, + }, + mango_oracles.clone(), + account_update_sender.clone(), + ); + } let first_websocket_slot = websocket_source::get_next_create_bank_slot( account_update_receiver.clone(), @@ -161,7 +217,7 @@ async fn main() -> anyhow::Result<()> { // Getting solana account snapshots via jsonrpc // FUTURE: of what to fetch a snapshot - should probably take as an input - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: rpc_url.clone(), mango_group, @@ -207,17 +263,18 @@ async fn main() -> anyhow::Result<()> { compute_limit_for_liq_ix: cli.compute_limit_for_liquidation, max_cu_per_transaction: 1_000_000, refresh_timeout: Duration::from_secs(cli.liquidation_refresh_timeout_secs as u64), - only_allowed_tokens: cli_args::cli_to_hashset::(cli.only_allow_tokens), - forbidden_tokens: cli_args::cli_to_hashset::(cli.forbidden_tokens), + only_allowed_tokens: cli_args::cli_to_hashset::(cli.only_allow_tokens.clone()), + forbidden_tokens: cli_args::cli_to_hashset::(cli.forbidden_tokens.clone()), only_allowed_perp_markets: cli_args::cli_to_hashset::( - cli.liquidation_only_allow_perp_markets, + cli.liquidation_only_allow_perp_markets.clone(), ), forbidden_perp_markets: cli_args::cli_to_hashset::( - cli.liquidation_forbidden_perp_markets, + cli.liquidation_forbidden_perp_markets.clone(), ), }; let tcs_config = trigger_tcs::Config { + refresh_timeout: Duration::from_secs(cli.tcs_refresh_timeout_secs), min_health_ratio: cli.min_health_ratio, max_trigger_quote_amount: (cli.tcs_max_trigger_amount * 1e6) as u64, compute_limit_for_trigger: cli.compute_limit_for_tcs, @@ -234,17 +291,19 @@ async fn main() -> anyhow::Result<()> { forbidden_tokens: liq_config.forbidden_tokens.clone(), }; - let mut rebalance_interval = tokio::time::interval(Duration::from_secs(30)); let (rebalance_trigger_sender, rebalance_trigger_receiver) = async_channel::bounded::<()>(1); + let (tx_tcs_trigger_sender, tx_tcs_trigger_receiver) = async_channel::unbounded::<()>(); + let (tx_liq_trigger_sender, tx_liq_trigger_receiver) = async_channel::unbounded::<()>(); let rebalance_config = rebalance::Config { enabled: cli.rebalance == BoolArg::True, slippage_bps: cli.rebalance_slippage_bps, borrow_settle_excess: (1f64 + cli.rebalance_borrow_settle_excess).max(1f64), refresh_timeout: Duration::from_secs(cli.rebalance_refresh_timeout_secs), jupiter_version: cli.jupiter_version.into(), - skip_tokens: cli.rebalance_skip_tokens.unwrap_or_default(), + skip_tokens: cli.rebalance_skip_tokens.clone().unwrap_or(Vec::new()), alternate_jupiter_route_tokens: cli .rebalance_alternate_jupiter_route_tokens + .clone() .unwrap_or_default(), allow_withdraws: signer_is_owner, }; @@ -257,23 +316,39 @@ async fn main() -> anyhow::Result<()> { config: rebalance_config, }); - let mut liquidation = Box::new(LiquidationState { + let liquidation = Box::new(LiquidationState { mango_client: mango_client.clone(), - account_fetcher, + account_fetcher: account_fetcher.clone(), liquidation_config: liq_config, + errors: Arc::new(RwLock::new( + ErrorTracking::builder() + .skip_threshold(2) + .skip_threshold_for_type(LiqErrorType::Liq, 5) + .skip_duration(Duration::from_secs(120)) + .build()?, + )), + oracle_errors: Arc::new(RwLock::new( + ErrorTracking::builder() + .skip_threshold(1) + .skip_duration(Duration::from_secs( + cli.skip_oracle_error_in_logs_duration_secs, + )) + .build()?, + )), + }); + + let tcs = Box::new(TcsState { + mango_client: mango_client.clone(), + account_fetcher, trigger_tcs_config: tcs_config, token_swap_info: token_swap_info_updater.clone(), - errors: ErrorTracking::builder() - .skip_threshold(2) - .skip_threshold_for_type(LiqErrorType::Liq, 5) - .skip_duration(Duration::from_secs(120)) - .build()?, - oracle_errors: ErrorTracking::builder() - .skip_threshold(1) - .skip_duration(Duration::from_secs( - cli.skip_oracle_error_in_logs_duration_secs, - )) - .build()?, + errors: Arc::new(RwLock::new( + ErrorTracking::builder() + .skip_threshold(2) + .skip_threshold_for_type(LiqErrorType::Liq, 5) + .skip_duration(Duration::from_secs(120)) + .build()?, + )), }); info!("main loop"); @@ -329,7 +404,8 @@ async fn main() -> anyhow::Result<()> { metric_mango_accounts.set(state.mango_accounts.len() as u64); } } - Message::Snapshot(snapshot) => { + Message::Snapshot(snapshot, snapshot_type) => { + debug!("Got a new snapshot ({:?})", snapshot_type); let mut state = shared_state.write().unwrap(); let mut reception_time = None; @@ -366,7 +442,9 @@ async fn main() -> anyhow::Result<()> { } metric_mango_accounts.set(state.mango_accounts.len() as u64); - state.one_snapshot_done = true; + if snapshot_type == SnapshotType::Full { + state.one_snapshot_done = true; + } } _ => {} } @@ -374,126 +452,87 @@ async fn main() -> anyhow::Result<()> { } }); + let mut optional_jobs = vec![]; + // Could be refactored to only start the below jobs when the first snapshot is done. // But need to take care to abort if the above job aborts beforehand. + if cli.rebalance == BoolArg::True { + let rebalance_job = + spawn_rebalance_job(&shared_state, rebalance_trigger_receiver, rebalancer); + optional_jobs.push(rebalance_job); + } - let rebalance_job = tokio::spawn({ - let shared_state = shared_state.clone(); - async move { - loop { - tokio::select! { - _ = rebalance_interval.tick() => {} - _ = rebalance_trigger_receiver.recv() => {} - } - if !shared_state.read().unwrap().one_snapshot_done { - continue; - } - if let Err(err) = rebalancer.zero_all_non_quote().await { - error!("failed to rebalance liqor: {:?}", err); - - // Workaround: We really need a sequence enforcer in the liquidator since we don't want to - // accidentally send a similar tx again when we incorrectly believe an earlier one got forked - // off. For now, hard sleep on error to avoid the most frequent error cases. - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } - }); - - let liquidation_job = tokio::spawn({ - let mut interval = - mango_v4_client::delay_interval(Duration::from_millis(cli.check_interval_ms)); - let mut metric_liquidation_check = metrics.register_latency("liquidation_check".into()); - let mut metric_liquidation_start_end = - metrics.register_latency("liquidation_start_end".into()); - - let mut liquidation_start_time = None; - let mut tcs_start_time = None; - - let shared_state = shared_state.clone(); - async move { - loop { - interval.tick().await; - - let account_addresses = { - let mut state = shared_state.write().unwrap(); - if !state.one_snapshot_done { - // discard first latency info as it will skew data too much - state.oldest_chain_event_reception_time = None; - continue; - } - if state.oldest_chain_event_reception_time.is_none() - && liquidation_start_time.is_none() - { - // no new update, skip computing - continue; - } - - state.mango_accounts.iter().cloned().collect_vec() - }; - - liquidation.errors.update(); - liquidation.oracle_errors.update(); - - if liquidation_start_time.is_none() { - liquidation_start_time = Some(Instant::now()); - } - - let liquidated = liquidation - .maybe_liquidate_one(account_addresses.iter()) - .await; + if cli.liquidation_enabled == BoolArg::True { + let liquidation_job = liquidation_state::spawn_liquidation_job( + &cli, + &shared_state, + tx_liq_trigger_sender.clone(), + liquidation.clone(), + &metrics, + ); + optional_jobs.push(liquidation_job); + } - if !liquidated { - // This will be incorrect if we liquidate the last checked account - // (We will wait for next full run, skewing latency metrics) - // Probability is very low, might not need to be fixed + if cli.take_tcs == BoolArg::True { + let tcs_job = tcs_state::spawn_tcs_job( + &cli, + &shared_state, + tx_tcs_trigger_sender.clone(), + tcs.clone(), + &metrics, + ); + optional_jobs.push(tcs_job); + } - let mut state = shared_state.write().unwrap(); - let reception_time = state.oldest_chain_event_reception_time.unwrap(); - let current_time = Instant::now(); + if cli.liquidation_enabled == BoolArg::True || cli.take_tcs == BoolArg::True { + let mut tx_sender_jobs = tx_sender::spawn_tx_senders_job( + cli.max_parallel_operations, + cli.liquidation_enabled == BoolArg::True, + tx_liq_trigger_receiver, + tx_tcs_trigger_receiver, + tx_tcs_trigger_sender, + rebalance_trigger_sender, + shared_state.clone(), + liquidation, + tcs, + ); + optional_jobs.append(&mut tx_sender_jobs); + } - state.oldest_chain_event_reception_time = None; + if cli.telemetry == BoolArg::True { + optional_jobs.push(spawn_telemetry_job(&cli, mango_client.clone())); + } - metric_liquidation_check.push(current_time - reception_time); - metric_liquidation_start_end - .push(current_time - liquidation_start_time.unwrap()); - liquidation_start_time = None; - } + let token_swap_info_job = + spawn_token_swap_refresh_job(&cli, shared_state, token_swap_info_updater); + let check_changes_for_abort_job = spawn_context_change_watchdog_job(mango_client.clone()); - let mut took_tcs = false; - if !liquidated && cli.take_tcs == BoolArg::True { - tcs_start_time = Some(tcs_start_time.unwrap_or(Instant::now())); - - took_tcs = liquidation - .maybe_take_token_conditional_swap(account_addresses.iter()) - .await - .unwrap_or_else(|err| { - error!("error during maybe_take_token_conditional_swap: {err}"); - false - }); - - if !took_tcs { - let current_time = Instant::now(); - let mut metric_tcs_start_end = - metrics.register_latency("tcs_start_end".into()); - metric_tcs_start_end.push(current_time - tcs_start_time.unwrap()); - tcs_start_time = None; - } - } + let mut jobs: futures::stream::FuturesUnordered<_> = vec![ + snapshot_job, + data_job, + token_swap_info_job, + check_changes_for_abort_job, + ] + .into_iter() + .chain(optional_jobs) + .chain(prio_jobs.into_iter()) + .collect(); + jobs.next().await; - if liquidated || took_tcs { - rebalance_trigger_sender.send_unless_full(()).unwrap(); - } - } - } - }); + error!("a critical job aborted, exiting"); + Ok(()) +} - let token_swap_info_job = tokio::spawn({ +fn spawn_token_swap_refresh_job( + cli: &Cli, + shared_state: Arc>, + token_swap_info_updater: Arc, +) -> JoinHandle<()> { + tokio::spawn({ let mut interval = mango_v4_client::delay_interval(Duration::from_secs( cli.token_swap_refresh_interval_secs, )); let mut startup_wait = mango_v4_client::delay_interval(Duration::from_secs(1)); - let shared_state = shared_state.clone(); async move { loop { if !shared_state.read().unwrap().one_snapshot_done { @@ -517,41 +556,56 @@ async fn main() -> anyhow::Result<()> { token_swap_info_updater.log_all(); } } - }); + }) +} - let check_changes_for_abort_job = - tokio::spawn(MangoClient::loop_check_for_context_changes_and_abort( - mango_client.clone(), - Duration::from_secs(300), - )); +fn spawn_context_change_watchdog_job(mango_client: Arc) -> JoinHandle<()> { + tokio::spawn(MangoClient::loop_check_for_context_changes_and_abort( + mango_client, + Duration::from_secs(300), + )) +} - if cli.telemetry == BoolArg::True { - tokio::spawn(telemetry::report_regularly( - mango_client, - cli.min_health_ratio, - )); - } +fn spawn_telemetry_job(cli: &Cli, mango_client: Arc) -> JoinHandle<()> { + tokio::spawn(telemetry::report_regularly( + mango_client, + cli.min_health_ratio, + )) +} - use cli_args::{BoolArg, Cli, CliDotenv}; - use futures::StreamExt; - let mut jobs: futures::stream::FuturesUnordered<_> = vec![ - data_job, - rebalance_job, - liquidation_job, - token_swap_info_job, - check_changes_for_abort_job, - ] - .into_iter() - .chain(prio_jobs.into_iter()) - .collect(); - jobs.next().await; +fn spawn_rebalance_job( + shared_state: &Arc>, + rebalance_trigger_receiver: async_channel::Receiver<()>, + rebalancer: Arc, +) -> JoinHandle<()> { + let mut rebalance_interval = tokio::time::interval(Duration::from_secs(30)); - error!("a critical job aborted, exiting"); - Ok(()) + tokio::spawn({ + let shared_state = shared_state.clone(); + async move { + loop { + tokio::select! { + _ = rebalance_interval.tick() => {} + _ = rebalance_trigger_receiver.recv() => {} + } + if !shared_state.read().unwrap().one_snapshot_done { + continue; + } + if let Err(err) = rebalancer.zero_all_non_quote().await { + error!("failed to rebalance liqor: {:?}", err); + + // Workaround: We really need a sequence enforcer in the liquidator since we don't want to + // accidentally send a similar tx again when we incorrectly believe an earlier one got forked + // off. For now, hard sleep on error to avoid the most frequent error cases. + tokio::time::sleep(Duration::from_secs(10)).await; + } + } + } + }) } #[derive(Default)] -struct SharedState { +pub struct SharedState { /// Addresses of the MangoAccounts belonging to the mango program. /// Needed to check health of them all when the cache updates. mango_accounts: HashSet, @@ -561,6 +615,18 @@ struct SharedState { /// Oldest chain event not processed yet oldest_chain_event_reception_time: Option, + + /// Liquidation candidates (locally identified as liquidatable) + liquidation_candidates_accounts: indexmap::set::IndexSet, + + /// Interesting TCS that should be triggered + interesting_tcs: indexmap::set::IndexSet<(Pubkey, u64, u64)>, + + /// Liquidation currently being processed by a worker + processing_liquidation: HashSet, + + // TCS currently being processed by a worker + processing_tcs: HashSet<(Pubkey, u64, u64)>, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -584,218 +650,6 @@ impl std::fmt::Display for LiqErrorType { } } -struct LiquidationState { - mango_client: Arc, - account_fetcher: Arc, - token_swap_info: Arc, - liquidation_config: liquidate::Config, - trigger_tcs_config: trigger_tcs::Config, - - errors: ErrorTracking, - oracle_errors: ErrorTracking, -} - -impl LiquidationState { - async fn maybe_liquidate_one<'b>( - &mut self, - accounts_iter: impl Iterator, - ) -> bool { - use rand::seq::SliceRandom; - - let mut accounts = accounts_iter.collect::>(); - { - let mut rng = rand::thread_rng(); - accounts.shuffle(&mut rng); - } - - for pubkey in accounts { - if self - .maybe_liquidate_and_log_error(pubkey) - .await - .unwrap_or(false) - { - return true; - } - } - - false - } - - async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result { - let now = Instant::now(); - let error_tracking = &mut self.errors; - - // Skip a pubkey if there've been too many errors recently - if let Some(error_entry) = - error_tracking.had_too_many_errors(LiqErrorType::Liq, pubkey, now) - { - trace!( - %pubkey, - error_entry.count, - "skip checking account for liquidation, had errors recently", - ); - return Ok(false); - } - - let result = liquidate::maybe_liquidate_account( - &self.mango_client, - &self.account_fetcher, - pubkey, - &self.liquidation_config, - ) - .await; - - if let Err(err) = result.as_ref() { - if let Some((ti, ti_name)) = err.try_unwrap_oracle_error() { - if self - .oracle_errors - .had_too_many_errors(LiqErrorType::Liq, &ti, Instant::now()) - .is_none() - { - warn!( - "{:?} recording oracle error for token {} {}", - chrono::offset::Utc::now(), - ti_name, - ti - ); - } - - self.oracle_errors - .record(LiqErrorType::Liq, &ti, err.to_string()); - return result; - } - - // Keep track of pubkeys that had errors - error_tracking.record(LiqErrorType::Liq, pubkey, err.to_string()); - - // Not all errors need to be raised to the user's attention. - let mut is_error = true; - - // Simulation errors due to liqee precondition failures on the liquidation instructions - // will commonly happen if our liquidator is late or if there are chain forks. - match err.downcast_ref::() { - Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => { - if logs.iter().any(|line| { - line.contains("HealthMustBeNegative") || line.contains("IsNotBankrupt") - }) { - is_error = false; - } - } - _ => {} - }; - if is_error { - error!("liquidating account {}: {:?}", pubkey, err); - } else { - trace!("liquidating account {}: {:?}", pubkey, err); - } - } else { - error_tracking.clear(LiqErrorType::Liq, pubkey); - } - - result - } - - async fn maybe_take_token_conditional_swap( - &mut self, - accounts_iter: impl Iterator, - ) -> anyhow::Result { - let accounts = accounts_iter.collect::>(); - - let now = Instant::now(); - let now_ts: u64 = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH)? - .as_secs(); - - let tcs_context = trigger_tcs::Context { - mango_client: self.mango_client.clone(), - account_fetcher: self.account_fetcher.clone(), - token_swap_info: self.token_swap_info.clone(), - config: self.trigger_tcs_config.clone(), - jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()), - now_ts, - }; - - // Find interesting (pubkey, tcsid, volume) - let mut interesting_tcs = Vec::with_capacity(accounts.len()); - for pubkey in accounts.iter() { - if let Some(error_entry) = - self.errors - .had_too_many_errors(LiqErrorType::TcsCollectionHard, pubkey, now) - { - trace!( - %pubkey, - error_entry.count, - "skip checking account for tcs, had errors recently", - ); - continue; - } - - match tcs_context.find_interesting_tcs_for_account(pubkey) { - Ok(v) => { - self.errors.clear(LiqErrorType::TcsCollectionHard, pubkey); - if v.is_empty() { - self.errors - .clear(LiqErrorType::TcsCollectionPartial, pubkey); - self.errors.clear(LiqErrorType::TcsExecution, pubkey); - } else if v.iter().all(|it| it.is_ok()) { - self.errors - .clear(LiqErrorType::TcsCollectionPartial, pubkey); - } else { - for it in v.iter() { - if let Err(e) = it { - self.errors.record( - LiqErrorType::TcsCollectionPartial, - pubkey, - e.to_string(), - ); - } - } - } - interesting_tcs.extend(v.iter().filter_map(|it| it.as_ref().ok())); - } - Err(e) => { - self.errors - .record(LiqErrorType::TcsCollectionHard, pubkey, e.to_string()); - } - } - } - if interesting_tcs.is_empty() { - return Ok(false); - } - - let (txsigs, mut changed_pubkeys) = tcs_context - .execute_tcs(&mut interesting_tcs, &mut self.errors) - .await?; - for pubkey in changed_pubkeys.iter() { - self.errors.clear(LiqErrorType::TcsExecution, pubkey); - } - if txsigs.is_empty() { - return Ok(false); - } - changed_pubkeys.push(self.mango_client.mango_account_address); - - // Force a refresh of affected accounts - let slot = self - .account_fetcher - .transaction_max_slot(&txsigs) - .await - .context("transaction_max_slot")?; - if let Err(e) = self - .account_fetcher - .refresh_accounts_via_rpc_until_slot( - &changed_pubkeys, - slot, - self.liquidation_config.refresh_timeout, - ) - .await - { - info!(slot, "could not refresh after tcs execution: {}", e); - } - - Ok(true) - } -} - fn start_chain_data_metrics(chain: Arc>, metrics: &metrics::Metrics) { let mut interval = mango_v4_client::delay_interval(Duration::from_secs(600)); diff --git a/bin/liquidator/src/rebalance.rs b/bin/liquidator/src/rebalance.rs index fbcd28f29b..7977395031 100644 --- a/bin/liquidator/src/rebalance.rs +++ b/bin/liquidator/src/rebalance.rs @@ -69,9 +69,19 @@ impl Rebalancer { "checking for rebalance" ); - self.rebalance_perps().await?; - self.rebalance_tokens().await?; + let rebalance_perps_res = self.rebalance_perps().await; + let rebalance_tokens_res = self.rebalance_tokens().await; + if rebalance_perps_res.is_err() && rebalance_tokens_res.is_err() { + anyhow::bail!( + "Failed to rebalance perps ({}) and tokens ({})", + rebalance_perps_res.unwrap_err(), + rebalance_tokens_res.unwrap_err() + ) + } + + rebalance_perps_res?; + rebalance_tokens_res?; Ok(()) } @@ -278,7 +288,7 @@ impl Rebalancer { // TODO: configurable? let quote_token = self.mango_client.context.token(QUOTE_TOKEN_INDEX); - for token_position in account.active_token_positions() { + for token_position in Self::shuffle(account.active_token_positions()) { let token_index = token_position.token_index; let token = self.mango_client.context.token(token_index); if token_index == quote_token.token_index @@ -556,7 +566,7 @@ impl Rebalancer { async fn rebalance_perps(&self) -> anyhow::Result<()> { let account = self.mango_account()?; - for perp_position in account.active_perp_positions() { + for perp_position in Self::shuffle(account.active_perp_positions()) { let perp = self.mango_client.context.perp(perp_position.market_index); if !self.rebalance_perp(&account, perp, perp_position).await? { return Ok(()); @@ -565,4 +575,16 @@ impl Rebalancer { Ok(()) } + + fn shuffle(iterator: impl Iterator) -> Vec { + use rand::seq::SliceRandom; + + let mut result = iterator.collect::>(); + { + let mut rng = rand::thread_rng(); + result.shuffle(&mut rng); + } + + result + } } diff --git a/bin/liquidator/src/tcs_state.rs b/bin/liquidator/src/tcs_state.rs new file mode 100644 index 0000000000..6434a212e4 --- /dev/null +++ b/bin/liquidator/src/tcs_state.rs @@ -0,0 +1,218 @@ +use crate::cli_args::Cli; +use crate::metrics::Metrics; +use crate::token_swap_info::TokenSwapInfoUpdater; +use crate::{trigger_tcs, LiqErrorType, SharedState}; +use anchor_lang::prelude::Pubkey; +use anyhow::Context; +use itertools::Itertools; +use mango_v4_client::error_tracking::ErrorTracking; +use mango_v4_client::{chain_data, MangoClient}; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; +use tokio::task::JoinHandle; +use tracing::{error, info, trace}; + +pub fn spawn_tcs_job( + cli: &Cli, + shared_state: &Arc>, + tx_trigger_sender: async_channel::Sender<()>, + mut tcs: Box, + metrics: &Metrics, +) -> JoinHandle<()> { + tokio::spawn({ + let mut interval = + mango_v4_client::delay_interval(Duration::from_millis(cli.tcs_check_interval_ms)); + let mut tcs_start_time = None; + let mut metric_tcs_start_end = metrics.register_latency("tcs_start_end".into()); + let shared_state = shared_state.clone(); + + async move { + loop { + interval.tick().await; + + let account_addresses = { + let state = shared_state.write().unwrap(); + if !state.one_snapshot_done { + continue; + } + state.mango_accounts.iter().cloned().collect_vec() + }; + + tcs.errors.write().unwrap().update(); + + tcs_start_time = Some(tcs_start_time.unwrap_or(Instant::now())); + + let found_candidates = tcs + .find_candidates(account_addresses.iter(), |candidate| { + if shared_state + .write() + .unwrap() + .interesting_tcs + .insert(candidate) + { + tx_trigger_sender.try_send(())?; + } + + Ok(()) + }) + .await + .unwrap_or_else(|err| { + error!("error during find_candidate: {err}"); + 0 + }); + + if found_candidates > 0 { + tracing::debug!("found {} candidates for triggering", found_candidates); + } + + let current_time = Instant::now(); + metric_tcs_start_end.push(current_time - tcs_start_time.unwrap()); + tcs_start_time = None; + } + } + }) +} + +#[derive(Clone)] +pub struct TcsState { + pub mango_client: Arc, + pub account_fetcher: Arc, + pub token_swap_info: Arc, + pub trigger_tcs_config: trigger_tcs::Config, + + pub errors: Arc>>, +} + +impl TcsState { + async fn find_candidates( + &mut self, + accounts_iter: impl Iterator, + action: impl Fn((Pubkey, u64, u64)) -> anyhow::Result<()>, + ) -> anyhow::Result { + let accounts = accounts_iter.collect::>(); + + let now = Instant::now(); + let now_ts: u64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + + let tcs_context = trigger_tcs::Context { + mango_client: self.mango_client.clone(), + account_fetcher: self.account_fetcher.clone(), + token_swap_info: self.token_swap_info.clone(), + config: self.trigger_tcs_config.clone(), + jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()), + now_ts, + }; + + let mut found_counter = 0; + + // Find interesting (pubkey, tcsid, volume) + for pubkey in accounts.iter() { + if let Some(error_entry) = self.errors.read().unwrap().had_too_many_errors( + LiqErrorType::TcsCollectionHard, + pubkey, + now, + ) { + trace!( + %pubkey, + error_entry.count, + "skip checking account for tcs, had errors recently", + ); + continue; + } + + let candidates = tcs_context.find_interesting_tcs_for_account(pubkey); + let mut error_guard = self.errors.write().unwrap(); + + match candidates { + Ok(v) => { + error_guard.clear(LiqErrorType::TcsCollectionHard, pubkey); + if v.is_empty() { + error_guard.clear(LiqErrorType::TcsCollectionPartial, pubkey); + error_guard.clear(LiqErrorType::TcsExecution, pubkey); + } else if v.iter().all(|it| it.is_ok()) { + error_guard.clear(LiqErrorType::TcsCollectionPartial, pubkey); + } else { + for it in v.iter() { + if let Err(e) = it { + error_guard.record( + LiqErrorType::TcsCollectionPartial, + pubkey, + e.to_string(), + ); + } + } + } + for interesting_candidate_res in v.iter() { + if let Ok(interesting_candidate) = interesting_candidate_res { + action(*interesting_candidate).expect("failed to send TCS candidate"); + found_counter += 1; + } + } + } + Err(e) => { + error_guard.record(LiqErrorType::TcsCollectionHard, pubkey, e.to_string()); + } + } + } + + return Ok(found_counter); + } + + pub async fn maybe_take_token_conditional_swap( + &mut self, + mut interesting_tcs: Vec<(Pubkey, u64, u64)>, + ) -> anyhow::Result { + let now_ts: u64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + + let tcs_context = trigger_tcs::Context { + mango_client: self.mango_client.clone(), + account_fetcher: self.account_fetcher.clone(), + token_swap_info: self.token_swap_info.clone(), + config: self.trigger_tcs_config.clone(), + jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()), + now_ts, + }; + + if interesting_tcs.is_empty() { + return Ok(false); + } + + let (txsigs, mut changed_pubkeys) = tcs_context + .execute_tcs(&mut interesting_tcs, self.errors.clone()) + .await?; + for pubkey in changed_pubkeys.iter() { + self.errors + .write() + .unwrap() + .clear(LiqErrorType::TcsExecution, pubkey); + } + if txsigs.is_empty() { + return Ok(false); + } + changed_pubkeys.push(self.mango_client.mango_account_address); + + // Force a refresh of affected accounts + let slot = self + .account_fetcher + .transaction_max_slot(&txsigs) + .await + .context("transaction_max_slot")?; + if let Err(e) = self + .account_fetcher + .refresh_accounts_via_rpc_until_slot( + &changed_pubkeys, + slot, + self.trigger_tcs_config.refresh_timeout, + ) + .await + { + info!(slot, "could not refresh after tcs execution: {}", e); + } + + Ok(true) + } +} diff --git a/bin/liquidator/src/trigger_tcs.rs b/bin/liquidator/src/trigger_tcs.rs index d421048460..b5346d9af7 100644 --- a/bin/liquidator/src/trigger_tcs.rs +++ b/bin/liquidator/src/trigger_tcs.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::time::Duration; use std::{ collections::HashMap, pin::Pin, @@ -15,6 +16,7 @@ use mango_v4::{ use mango_v4_client::{chain_data, jupiter, MangoClient, TransactionBuilder}; use anyhow::Context as AnyhowContext; +use mango_v4::accounts_ix::HealthCheckKind::MaintRatio; use solana_sdk::signature::Signature; use tracing::*; use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; @@ -55,6 +57,7 @@ pub enum Mode { #[derive(Clone)] pub struct Config { + pub refresh_timeout: Duration, pub min_health_ratio: f64, pub max_trigger_quote_amount: u64, pub compute_limit_for_trigger: u32, @@ -1000,7 +1003,7 @@ impl Context { pub async fn execute_tcs( &self, tcs: &mut [(Pubkey, u64, u64)], - error_tracking: &mut ErrorTracking, + error_tracking: Arc>>, ) -> anyhow::Result<(Vec, Vec)> { use rand::distributions::{Distribution, WeightedError, WeightedIndex}; @@ -1049,7 +1052,7 @@ impl Context { } Err(e) => { trace!(%result.pubkey, "preparation error {:?}", e); - error_tracking.record( + error_tracking.write().unwrap().record( LiqErrorType::TcsExecution, &result.pubkey, e.to_string(), @@ -1093,7 +1096,7 @@ impl Context { }; // start the new one - if let Some(job) = self.prepare_job(&pubkey, tcs_id, volume, error_tracking) { + if let Some(job) = self.prepare_job(&pubkey, tcs_id, volume, error_tracking.clone()) { pending_volume += volume; pending.push(job); } @@ -1130,7 +1133,11 @@ impl Context { Ok(v) => Some((pubkey, v)), Err(err) => { trace!(%pubkey, "execution error {:?}", err); - error_tracking.record(LiqErrorType::TcsExecution, &pubkey, err.to_string()); + error_tracking.write().unwrap().record( + LiqErrorType::TcsExecution, + &pubkey, + err.to_string(), + ); None } }); @@ -1145,12 +1152,14 @@ impl Context { pubkey: &Pubkey, tcs_id: u64, volume: u64, - error_tracking: &ErrorTracking, + error_tracking: Arc>>, ) -> Option + Send>>> { // Skip a pubkey if there've been too many errors recently - if let Some(error_entry) = - error_tracking.had_too_many_errors(LiqErrorType::TcsExecution, pubkey, Instant::now()) - { + if let Some(error_entry) = error_tracking.read().unwrap().had_too_many_errors( + LiqErrorType::TcsExecution, + pubkey, + Instant::now(), + ) { trace!( "skip checking for tcs on account {pubkey}, had {} errors recently", error_entry.count @@ -1225,6 +1234,27 @@ impl Context { .instructions .append(&mut trigger_ixs.instructions); + let (_, tcs) = liqee.token_conditional_swap_by_id(pending.tcs_id)?; + let affected_tokens = allowed_tokens + .iter() + .chain(&[tcs.buy_token_index, tcs.sell_token_index]) + .copied() + .collect_vec(); + let liqor = &self.mango_client.mango_account().await?; + tx_builder.instructions.append( + &mut self + .mango_client + .health_check_instruction( + liqor, + self.config.min_health_ratio, + affected_tokens, + vec![], + MaintRatio, + ) + .await? + .instructions, + ); + let txsig = tx_builder .send_and_confirm(&self.mango_client.client) .await?; diff --git a/bin/liquidator/src/tx_sender.rs b/bin/liquidator/src/tx_sender.rs new file mode 100644 index 0000000000..05027f6784 --- /dev/null +++ b/bin/liquidator/src/tx_sender.rs @@ -0,0 +1,241 @@ +use crate::liquidation_state::LiquidationState; +use crate::tcs_state::TcsState; +use crate::SharedState; +use anchor_lang::prelude::Pubkey; +use async_channel::{Receiver, Sender}; +use mango_v4_client::AsyncChannelSendUnlessFull; +use std::sync::{Arc, RwLock}; +use tokio::task::JoinHandle; +use tracing::{debug, error, trace}; + +enum WorkerTask { + Liquidation(Pubkey), + Tcs(Vec<(Pubkey, u64, u64)>), + + // Given two workers: #0=LIQ_only, #1=LIQ+TCS + // If they are both busy, and the scanning jobs find a new TCS and a new LIQ candidates and enqueue them in the channel + // Then if #1 wake up first, it will consume the LIQ candidate (LIQ always have priority) + // Then when #0 wake up, it will not find any LIQ candidate, and would not do anything (it won't take a TCS) + // But if we do nothing, #1 would never wake up again (no new task in channel) + // So we use this `GiveUpTcs` that will be handled by #0 by queuing a new signal the channel and will wake up #1 again + GiveUpTcs, + + // Can happen if TCS is batched (2 TCS enqueued, 2 workers waken, but first one take both tasks) + NoWork, +} + +pub fn spawn_tx_senders_job( + max_parallel_operations: u64, + enable_liquidation: bool, + tx_liq_trigger_receiver: Receiver<()>, + tx_tcs_trigger_receiver: Receiver<()>, + tx_tcs_trigger_sender: Sender<()>, + rebalance_trigger_sender: Sender<()>, + shared_state: Arc>, + liquidation: Box, + tcs: Box, +) -> Vec> { + if max_parallel_operations < 1 { + error!("max_parallel_operations must be >= 1"); + std::process::exit(1) + } + + let reserve_one_worker_for_liquidation = max_parallel_operations > 1 && enable_liquidation; + + let workers: Vec> = (0..max_parallel_operations) + .map(|worker_id| { + tokio::spawn({ + let shared_state = shared_state.clone(); + let receiver_liq = tx_liq_trigger_receiver.clone(); + let receiver_tcs = tx_tcs_trigger_receiver.clone(); + let sender_tcs = tx_tcs_trigger_sender.clone(); + let rebalance_trigger_sender = rebalance_trigger_sender.clone(); + let liquidation = liquidation.clone(); + let tcs = tcs.clone(); + async move { + worker_loop( + shared_state, + receiver_liq, + receiver_tcs, + sender_tcs, + rebalance_trigger_sender, + liquidation, + tcs, + worker_id, + reserve_one_worker_for_liquidation && worker_id == 0, + ) + .await; + } + }) + }) + .collect(); + + workers +} + +async fn worker_loop( + shared_state: Arc>, + liq_receiver: Receiver<()>, + tcs_receiver: Receiver<()>, + tcs_sender: Sender<()>, + rebalance_trigger_sender: Sender<()>, + mut liquidation: Box, + mut tcs: Box, + id: u64, + only_liquidation: bool, +) { + loop { + debug!( + "Worker #{} waiting for task (only_liq={})", + id, only_liquidation + ); + + let _ = if only_liquidation { + liq_receiver.recv().await.expect("receive failed") + } else { + tokio::select!( + _ = liq_receiver.recv() => {}, + _ = tcs_receiver.recv() => {}, + ) + }; + + // a task must be available to process + // find it in global shared state, and mark it as processing + let task = worker_pull_task(&shared_state, id, only_liquidation) + .expect("Worker woke up but has nothing to do"); + + // execute the task + let need_rebalancing = match &task { + WorkerTask::Liquidation(l) => worker_execute_liquidation(&mut liquidation, *l).await, + WorkerTask::Tcs(t) => worker_execute_tcs(&mut tcs, t.clone()).await, + WorkerTask::GiveUpTcs => worker_give_up_tcs(&tcs_sender).await, + WorkerTask::NoWork => false, + }; + + if need_rebalancing { + rebalance_trigger_sender.send_unless_full(()).unwrap(); + } + + // remove from shared state + worker_finalize_task(&shared_state, id, task, need_rebalancing); + } +} + +async fn worker_give_up_tcs(sender: &Sender<()>) -> bool { + sender.send(()).await.expect("sending task failed"); + false +} + +async fn worker_execute_tcs(tcs: &mut Box, candidates: Vec<(Pubkey, u64, u64)>) -> bool { + tcs.maybe_take_token_conditional_swap(candidates) + .await + .unwrap_or(false) +} + +async fn worker_execute_liquidation( + liquidation: &mut Box, + candidate: Pubkey, +) -> bool { + liquidation + .maybe_liquidate_and_log_error(&candidate) + .await + .unwrap_or(false) +} + +fn worker_pull_task( + shared_state: &Arc>, + id: u64, + only_liquidation: bool, +) -> anyhow::Result { + let mut writer = shared_state.write().unwrap(); + + // print out list of all task for debugging + for x in &writer.liquidation_candidates_accounts { + if !writer.processing_liquidation.contains(x) { + trace!(" - LIQ {:?}", x); + } + } + + // next liq task to execute + if let Some(liq_candidate) = writer + .liquidation_candidates_accounts + .iter() + .find(|x| !writer.processing_liquidation.contains(x)) + .copied() + { + debug!("worker #{} got a liq candidate -> {}", id, liq_candidate); + writer.processing_liquidation.insert(liq_candidate); + return Ok(WorkerTask::Liquidation(liq_candidate)); + } + + let tcs_todo = writer.interesting_tcs.len() - writer.processing_tcs.len(); + + if only_liquidation { + debug!("worker #{} giving up TCS (todo count: {})", id, tcs_todo); + return Ok(WorkerTask::GiveUpTcs); + } + + for x in &writer.interesting_tcs { + if !writer.processing_tcs.contains(x) { + trace!(" - TCS {:?}", x); + } + } + + // next tcs task to execute + let max_tcs_batch_size = 20; + let tcs_candidates: Vec<(Pubkey, u64, u64)> = writer + .interesting_tcs + .iter() + .filter(|x| !writer.processing_tcs.contains(x)) + .take(max_tcs_batch_size) + .copied() + .collect(); + + for tcs_candidate in &tcs_candidates { + debug!( + "worker #{} got a tcs candidate -> {:?} (out of {})", + id, + tcs_candidate, + writer.interesting_tcs.len() + ); + writer.processing_tcs.insert(tcs_candidate.clone()); + } + + if tcs_candidates.len() > 0 { + Ok(WorkerTask::Tcs(tcs_candidates)) + } else { + debug!("worker #{} got nothing", id); + Ok(WorkerTask::NoWork) + } +} + +fn worker_finalize_task( + shared_state: &Arc>, + id: u64, + task: WorkerTask, + done: bool, +) { + let mut writer = shared_state.write().unwrap(); + match task { + WorkerTask::Liquidation(liq) => { + debug!( + "worker #{} - checked liq {:?} with success ? {}", + id, liq, done + ); + writer.liquidation_candidates_accounts.shift_remove(&liq); + writer.processing_liquidation.remove(&liq); + } + WorkerTask::Tcs(tcs_list) => { + for tcs in tcs_list { + debug!( + "worker #{} - checked tcs {:?} with success ? {}", + id, tcs, done + ); + writer.interesting_tcs.shift_remove(&tcs); + writer.processing_tcs.remove(&tcs); + } + } + WorkerTask::GiveUpTcs => {} + WorkerTask::NoWork => {} + } +} diff --git a/bin/service-mango-crank/src/main.rs b/bin/service-mango-crank/src/main.rs index bb0c144a1b..907aa804a8 100644 --- a/bin/service-mango-crank/src/main.rs +++ b/bin/service-mango-crank/src/main.rs @@ -158,8 +158,8 @@ async fn main() -> anyhow::Result<()> { }; if use_geyser { grpc_plugin_source::process_events( - &config.source, - &filter_config, + config.source, + filter_config, account_write_queue_sender, slot_queue_sender, metrics_tx.clone(), @@ -168,8 +168,8 @@ async fn main() -> anyhow::Result<()> { .await; } else { websocket_source::process_events( - &config.source, - &filter_config, + config.source, + filter_config, account_write_queue_sender, slot_queue_sender, ) diff --git a/bin/service-mango-fills/src/main.rs b/bin/service-mango-fills/src/main.rs index 77dd666b0e..39b98adf3b 100644 --- a/bin/service-mango-fills/src/main.rs +++ b/bin/service-mango-fills/src/main.rs @@ -614,8 +614,8 @@ async fn main() -> anyhow::Result<()> { }; if use_geyser { grpc_plugin_source::process_events( - &config.source, - &filter_config, + config.source, + filter_config, account_write_queue_sender, slot_queue_sender, metrics_tx.clone(), @@ -624,8 +624,8 @@ async fn main() -> anyhow::Result<()> { .await; } else { websocket_source::process_events( - &config.source, - &filter_config, + config.source, + filter_config, account_write_queue_sender, slot_queue_sender, ) diff --git a/bin/service-mango-health/conf/example-config.toml b/bin/service-mango-health/conf/example-config.toml index 40b290f1b7..72cffd5d30 100644 --- a/bin/service-mango-health/conf/example-config.toml +++ b/bin/service-mango-health/conf/example-config.toml @@ -1,7 +1,17 @@ +mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" + +[source_configuration] rpc_ws_url = "wss://mango.rpcpool.com/" rpc_http_url = "http://mango.rpcpool.com/" -mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" snapshot_interval_secs = 900 +use_grpc = false +dedup_queue_size = 50000 + +[[source_configuration.grpc_sources]] +name = "service-mango-health" +connection_string = "http://tyo64.rpcpool.com/" +#token = "" +retry_connection_sleep_secs = 30 # [postgres] # connection_string = "$PG_CONNECTION_STRING" diff --git a/bin/service-mango-health/conf/template-config.toml b/bin/service-mango-health/conf/template-config.toml index ac5fcee40e..442f865d22 100644 --- a/bin/service-mango-health/conf/template-config.toml +++ b/bin/service-mango-health/conf/template-config.toml @@ -1,7 +1,17 @@ +mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" + +[source_configuration] rpc_http_url = "$RPC_HTTP_URL" rpc_ws_url = "$RPC_WS_URL" -mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" snapshot_interval_secs = 900 +use_grpc = false +dedup_queue_size = 50000 + +[[source_configuration.grpc_sources]] +name = "service-mango-health" +connection_string = "$GRPC_URL" +token = "$GRPC_TOKEN" +retry_connection_sleep_secs = 30 [postgres] connection_string = "$PG_CONNECTION_STRING" diff --git a/bin/service-mango-health/src/configuration.rs b/bin/service-mango-health/src/configuration.rs index 3dc95cb332..d8375b23cd 100644 --- a/bin/service-mango-health/src/configuration.rs +++ b/bin/service-mango-health/src/configuration.rs @@ -1,3 +1,4 @@ +use mango_feeds_connector::GrpcSourceConfig; use serde_derive::Deserialize; use services_mango_lib::env_helper::string_or_env; use services_mango_lib::postgres_configuration::PostgresConfiguration; @@ -7,16 +8,25 @@ use std::collections::HashSet; pub struct Configuration { pub postgres: Option, #[serde(deserialize_with = "string_or_env")] - pub rpc_http_url: String, - #[serde(deserialize_with = "string_or_env")] - pub rpc_ws_url: String, - #[serde(deserialize_with = "string_or_env")] pub mango_group: String, + pub source_configuration: SourceConfiguration, pub computing_configuration: ComputingConfiguration, pub logging_configuration: LoggingConfiguration, pub persistence_configuration: PersistenceConfiguration, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct SourceConfiguration { + #[serde(deserialize_with = "string_or_env")] + pub rpc_http_url: String, + #[serde(deserialize_with = "string_or_env")] + pub rpc_ws_url: String, pub snapshot_interval_secs: u64, + + pub use_grpc: bool, + pub dedup_queue_size: usize, + pub grpc_sources: Vec, } #[derive(Clone, Debug, Deserialize)] diff --git a/bin/service-mango-health/src/main.rs b/bin/service-mango-health/src/main.rs index 9b3b5174a8..1baa76ed17 100644 --- a/bin/service-mango-health/src/main.rs +++ b/bin/service-mango-health/src/main.rs @@ -16,6 +16,11 @@ use crate::processors::health::HealthProcessor; use crate::processors::logger::LoggerProcessor; use crate::processors::persister::PersisterProcessor; +// jemalloc seems to be better at keeping the memory footprint reasonable over +// longer periods of time +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + #[tokio::main] async fn main() -> anyhow::Result<()> { let args: Vec = std::env::args().collect(); @@ -64,7 +69,8 @@ async fn main() -> anyhow::Result<()> { ) .await?; - let mut jobs = vec![exit_processor.job, data_processor.job, health_processor.job]; + let mut jobs = vec![exit_processor.job, health_processor.job]; + jobs.extend(data_processor.jobs); if let Some(logger) = logger { jobs.push(logger.job) diff --git a/bin/service-mango-health/src/processors/data.rs b/bin/service-mango-health/src/processors/data.rs index 3a25923b90..a1c0f4b739 100644 --- a/bin/service-mango-health/src/processors/data.rs +++ b/bin/service-mango-health/src/processors/data.rs @@ -3,10 +3,11 @@ use crate::processors::data::DataEvent::{AccountUpdate, Other, Snapshot}; use async_channel::Receiver; use chrono::Utc; use itertools::Itertools; -use mango_v4_client::account_update_stream::Message; +use mango_v4_client::account_update_stream::{Message, SnapshotType}; use mango_v4_client::snapshot_source::is_mango_account; use mango_v4_client::{ - account_update_stream, chain_data, snapshot_source, websocket_source, MangoGroupContext, + account_update_stream, chain_data, grpc_source, snapshot_source, websocket_source, + MangoGroupContext, }; use services_mango_lib::fail_or_retry; use services_mango_lib::retry_counter::RetryCounter; @@ -22,7 +23,7 @@ use tracing::warn; pub struct DataProcessor { pub channel: tokio::sync::broadcast::Sender, - pub job: JoinHandle<()>, + pub jobs: Vec>, pub chain_data: Arc>, } @@ -37,6 +38,7 @@ pub enum DataEvent { pub struct SnapshotEvent { pub received_at: chrono::DateTime, pub accounts: Vec, + pub is_full: bool, } #[derive(Clone, Debug)] @@ -52,8 +54,10 @@ impl DataProcessor { ) -> anyhow::Result { let mut retry_counter = RetryCounter::new(2); let mango_group = Pubkey::from_str(&configuration.mango_group)?; - let mango_stream = - fail_or_retry!(retry_counter, Self::init_mango_source(configuration).await)?; + let (mango_stream, snapshot_job) = fail_or_retry!( + retry_counter, + Self::init_mango_source(configuration, exit.clone()).await + )?; let (sender, _) = tokio::sync::broadcast::channel(8192); let sender_clone = sender.clone(); @@ -98,7 +102,7 @@ impl DataProcessor { let result = DataProcessor { channel: sender, - job, + jobs: vec![job, snapshot_job], chain_data, }; @@ -108,7 +112,7 @@ impl DataProcessor { fn new_rpc_async(configuration: &Configuration) -> RpcClientAsync { let commitment = CommitmentConfig::processed(); RpcClientAsync::new_with_timeout_and_commitment( - configuration.rpc_http_url.clone(), + configuration.source_configuration.rpc_http_url.clone(), Duration::from_secs(60), commitment, ) @@ -128,7 +132,7 @@ impl DataProcessor { })); } } - Message::Snapshot(snapshot) => { + Message::Snapshot(snapshot, snapshot_type) => { let mut result = Vec::new(); for update in snapshot.iter() { if is_mango_account(&update.account, &mango_group).is_some() { @@ -139,6 +143,7 @@ impl DataProcessor { return Some(Snapshot(SnapshotEvent { accounts: result, received_at, + is_full: snapshot_type == SnapshotType::Full, })); } _ => {} @@ -147,7 +152,10 @@ impl DataProcessor { return Some(Other); } - async fn init_mango_source(configuration: &Configuration) -> anyhow::Result> { + async fn init_mango_source( + configuration: &Configuration, + exit: Arc, + ) -> anyhow::Result<(Receiver, JoinHandle<()>)> { // // Client setup // @@ -174,15 +182,42 @@ impl DataProcessor { let (account_update_sender, account_update_receiver) = async_channel::unbounded::(); - websocket_source::start( - websocket_source::Config { - rpc_ws_url: configuration.rpc_ws_url.clone(), - serum_programs, - open_orders_authority: mango_group, - }, - mango_oracles.clone(), - account_update_sender.clone(), - ); + if configuration.source_configuration.use_grpc { + let metrics_config = mango_feeds_connector::MetricsConfig { + output_stdout: true, + output_http: false, + }; + let metrics = mango_feeds_connector::metrics::start( + metrics_config, + "service-mango-health".to_string(), + ); + let sources = configuration.source_configuration.grpc_sources.clone(); + + grpc_source::start( + grpc_source::Config { + rpc_http_url: configuration.source_configuration.rpc_http_url.clone(), + rpc_ws_url: configuration.source_configuration.rpc_ws_url.clone(), + serum_programs, + open_orders_authority: mango_group, + grpc_sources: sources, + }, + mango_oracles.clone(), + account_update_sender.clone(), + metrics, + exit, + ); + } else { + websocket_source::start( + websocket_source::Config { + rpc_http_url: configuration.source_configuration.rpc_http_url.clone(), + rpc_ws_url: configuration.source_configuration.rpc_ws_url.clone(), + serum_programs, + open_orders_authority: mango_group, + }, + mango_oracles.clone(), + account_update_sender.clone(), + ); + } let first_websocket_slot = websocket_source::get_next_create_bank_slot( account_update_receiver.clone(), @@ -192,19 +227,21 @@ impl DataProcessor { // Getting solana account snapshots via jsonrpc // FUTURE: of what to fetch a snapshot - should probably take as an input - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { - rpc_http_url: configuration.rpc_http_url.clone(), + rpc_http_url: configuration.source_configuration.rpc_http_url.clone(), mango_group, get_multiple_accounts_count: 100, parallel_rpc_requests: 10, - snapshot_interval: Duration::from_secs(configuration.snapshot_interval_secs), + snapshot_interval: Duration::from_secs( + configuration.source_configuration.snapshot_interval_secs, + ), min_slot: first_websocket_slot + 10, }, mango_oracles, account_update_sender, ); - Ok(account_update_receiver) + Ok((account_update_receiver, snapshot_job)) } } diff --git a/bin/service-mango-health/src/processors/health.rs b/bin/service-mango-health/src/processors/health.rs index d6dfe6fda7..b62e9672c3 100644 --- a/bin/service-mango-health/src/processors/health.rs +++ b/bin/service-mango-health/src/processors/health.rs @@ -59,7 +59,7 @@ impl HealthProcessor { let account_fetcher = chain_data::AccountFetcher { chain_data: chain_data.clone(), - rpc: RpcClient::new(configuration.rpc_http_url.clone()), + rpc: RpcClient::new(configuration.source_configuration.rpc_http_url.clone()), }; let mango_group_context = MangoGroupContext::new_from_rpc( @@ -85,7 +85,9 @@ impl HealthProcessor { for account in snap.accounts { accounts.insert(account); } - snapshot_received = true; + if snap.is_full { + snapshot_received = true; + } }, DataEvent::Other => { } diff --git a/bin/service-mango-orderbook/src/main.rs b/bin/service-mango-orderbook/src/main.rs index e2691d89d2..964d5b8170 100644 --- a/bin/service-mango-orderbook/src/main.rs +++ b/bin/service-mango-orderbook/src/main.rs @@ -606,8 +606,8 @@ async fn main() -> anyhow::Result<()> { let use_geyser = true; if use_geyser { grpc_plugin_source::process_events( - &config.source, - &filter_config, + config.source, + filter_config, account_write_queue_sender, slot_queue_sender, metrics_tx.clone(), @@ -616,8 +616,8 @@ async fn main() -> anyhow::Result<()> { .await; } else { websocket_source::process_events( - &config.source, - &filter_config, + config.source, + filter_config, account_write_queue_sender, slot_queue_sender, ) diff --git a/bin/service-mango-orderbook/src/orderbook_filter.rs b/bin/service-mango-orderbook/src/orderbook_filter.rs index 26faccb58c..185f305fd1 100644 --- a/bin/service-mango-orderbook/src/orderbook_filter.rs +++ b/bin/service-mango-orderbook/src/orderbook_filter.rs @@ -3,10 +3,11 @@ use fixed::types::I80F48; use itertools::Itertools; use log::*; use mango_feeds_connector::metrics::MetricU64; +use mango_feeds_connector::AccountWrite; use mango_feeds_connector::{ chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData}, metrics::{MetricType, Metrics}, - AccountWrite, SlotUpdate, + SlotUpdate, }; use mango_feeds_lib::{ base_lots_to_ui, base_lots_to_ui_perp, price_lots_to_ui, price_lots_to_ui_perp, MarketConfig, @@ -305,7 +306,7 @@ pub async fn init( ), }, ); - } + }, Ok(slot_update) = slot_queue_receiver.recv() => { chain_cache.update_slot(SlotData { slot: slot_update.slot, @@ -313,7 +314,6 @@ pub async fn init( status: slot_update.status, chain: 0, }); - } } diff --git a/bin/service-mango-pnl/src/main.rs b/bin/service-mango-pnl/src/main.rs index 49869f379d..eebee95fb7 100644 --- a/bin/service-mango-pnl/src/main.rs +++ b/bin/service-mango-pnl/src/main.rs @@ -320,8 +320,8 @@ async fn main() -> anyhow::Result<()> { ), }; grpc_plugin_source::process_events( - &config.source, - &filter_config, + config.source, + filter_config, account_write_queue_sender, slot_queue_sender, metrics_tx.clone(), diff --git a/bin/settler/src/main.rs b/bin/settler/src/main.rs index 57f408a219..fd3550c7e9 100644 --- a/bin/settler/src/main.rs +++ b/bin/settler/src/main.rs @@ -5,6 +5,7 @@ use std::time::Duration; use anchor_client::Cluster; use clap::Parser; use mango_v4::state::{PerpMarketIndex, TokenIndex}; +use mango_v4_client::account_update_stream::SnapshotType; use mango_v4_client::{ account_update_stream, chain_data, keypair_from_cli, priority_fees_cli, snapshot_source, websocket_source, Client, MangoClient, MangoGroupContext, TransactionBuilderConfig, @@ -162,6 +163,7 @@ async fn main() -> anyhow::Result<()> { // FUTURE: websocket feed should take which accounts to listen to as an input websocket_source::start( websocket_source::Config { + rpc_http_url: rpc_url.clone(), rpc_ws_url: ws_url.clone(), serum_programs, open_orders_authority: mango_group, @@ -178,7 +180,7 @@ async fn main() -> anyhow::Result<()> { // Getting solana account snapshots via jsonrpc // FUTURE: of what to fetch a snapshot - should probably take as an input - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: rpc_url.clone(), mango_group, @@ -272,7 +274,7 @@ async fn main() -> anyhow::Result<()> { metric_mango_accounts.set(state.mango_accounts.len() as u64); } } - Message::Snapshot(snapshot) => { + Message::Snapshot(snapshot, snapshot_type) => { let mut state = shared_state.write().unwrap(); // Track all mango account pubkeys for update in snapshot.iter() { @@ -291,7 +293,9 @@ async fn main() -> anyhow::Result<()> { } metric_mango_accounts.set(state.mango_accounts.len() as u64); - state.one_snapshot_done = true; + if snapshot_type == SnapshotType::Full { + state.one_snapshot_done = true; + } } _ => {} } @@ -353,6 +357,7 @@ async fn main() -> anyhow::Result<()> { use futures::StreamExt; let mut jobs: futures::stream::FuturesUnordered<_> = vec![ + snapshot_job, data_job, settle_job, tcs_start_job, diff --git a/lib/client/src/account_update_stream.rs b/lib/client/src/account_update_stream.rs index ec148d996e..da68698f5c 100644 --- a/lib/client/src/account_update_stream.rs +++ b/lib/client/src/account_update_stream.rs @@ -1,6 +1,8 @@ use solana_client::rpc_response::{Response, RpcKeyedAccount}; use solana_sdk::{account::AccountSharedData, pubkey::Pubkey}; +use mango_feeds_connector::AccountWrite; +use solana_sdk::account::WritableAccount; use std::time::Instant; use std::{str::FromStr, sync::Arc}; use tracing::*; @@ -30,6 +32,23 @@ impl AccountUpdate { reception_time: Instant::now(), }) } + pub fn from_feed(rpc: AccountWrite) -> Self { + let pubkey = rpc.pubkey; + let account = AccountSharedData::create( + rpc.lamports, + rpc.data, + rpc.owner, + rpc.executable, + rpc.rent_epoch, + ); + + AccountUpdate { + pubkey, + slot: rpc.slot, + account, + reception_time: Instant::now(), + } + } } #[derive(Clone)] @@ -38,10 +57,16 @@ pub struct ChainSlotUpdate { pub reception_time: Instant, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum SnapshotType { + Full, + Partial, +} + #[derive(Clone)] pub enum Message { Account(AccountUpdate), - Snapshot(Vec), + Snapshot(Vec, SnapshotType), Slot(ChainSlotUpdate), } @@ -60,7 +85,8 @@ impl Message { }, ); } - Message::Snapshot(snapshot) => { + Message::Snapshot(snapshot, snapshot_type) => { + trace!("websocket snapshot '{:?}' message", snapshot_type); for account_update in snapshot { chain.update_account( account_update.pubkey, @@ -73,7 +99,7 @@ impl Message { } } Message::Slot(slot_update) => { - trace!("websocket slot message"); + // trace!("websocket slot message"); let slot_update = match *(slot_update.slot_update) { solana_client::rpc_response::SlotUpdate::CreatedBank { slot, parent, .. diff --git a/lib/client/src/client.rs b/lib/client/src/client.rs index b6e3243a8d..65670a1c5d 100644 --- a/lib/client/src/client.rs +++ b/lib/client/src/client.rs @@ -17,7 +17,9 @@ use futures::{stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use tracing::*; -use mango_v4::accounts_ix::{Serum3OrderType, Serum3SelfTradeBehavior, Serum3Side}; +use mango_v4::accounts_ix::{ + HealthCheckKind, Serum3OrderType, Serum3SelfTradeBehavior, Serum3Side, +}; use mango_v4::accounts_zerocopy::KeyedAccountSharedData; use mango_v4::health::HealthCache; use mango_v4::state::{ @@ -80,6 +82,12 @@ pub struct ClientConfig { #[builder(default = "Duration::from_secs(60)")] pub timeout: Duration, + /// Jupiter Timeout, defaults to 30s + /// + /// This timeout applies to jupiter requests. + #[builder(default = "Duration::from_secs(30)")] + pub jupiter_timeout: Duration, + #[builder(default)] pub transaction_builder_config: TransactionBuilderConfig, @@ -560,6 +568,48 @@ impl MangoClient { self.send_and_confirm_owner_tx(ixs.to_instructions()).await } + /// Assert that health of account is > N + pub async fn health_check_instruction( + &self, + account: &MangoAccountValue, + min_health_value: f64, + affected_tokens: Vec, + affected_perp_markets: Vec, + check_kind: HealthCheckKind, + ) -> anyhow::Result { + let (health_check_metas, health_cu) = self + .derive_health_check_remaining_account_metas( + account, + affected_tokens, + vec![], + affected_perp_markets, + ) + .await?; + + let ixs = PreparedInstructions::from_vec( + vec![Instruction { + program_id: mango_v4::id(), + accounts: { + let mut ams = anchor_lang::ToAccountMetas::to_account_metas( + &mango_v4::accounts::HealthCheck { + group: self.group(), + account: self.mango_account_address, + }, + None, + ); + ams.extend(health_check_metas.into_iter()); + ams + }, + data: anchor_lang::InstructionData::data(&mango_v4::instruction::HealthCheck { + min_health_value, + check_kind, + }), + }], + self.instruction_cu(health_cu), + ); + Ok(ixs) + } + /// Creates token withdraw instructions for the MangoClient's account/owner. /// The `account` state is passed in separately so changes during the tx can be /// accounted for when deriving health accounts. @@ -2094,7 +2144,10 @@ impl MangoClient { // jupiter pub fn jupiter_v6(&self) -> jupiter::v6::JupiterV6 { - jupiter::v6::JupiterV6 { mango_client: self } + jupiter::v6::JupiterV6 { + mango_client: self, + timeout_duration: self.client.config.jupiter_timeout, + } } pub fn jupiter(&self) -> jupiter::Jupiter { diff --git a/lib/client/src/grpc_source.rs b/lib/client/src/grpc_source.rs new file mode 100644 index 0000000000..374b1155a4 --- /dev/null +++ b/lib/client/src/grpc_source.rs @@ -0,0 +1,253 @@ +use jsonrpc_core::futures::StreamExt; +use jsonrpc_core_client::transports::ws; + +use mango_feeds_connector::metrics::Metrics; +use solana_sdk::pubkey::Pubkey; + +use anyhow::Context; +use async_channel::{RecvError, Sender}; +use mango_feeds_connector::{ + AccountWrite, EntityFilter, FeedFilterType, FilterConfig, GrpcSourceConfig, Memcmp, + SnapshotSourceConfig, SourceConfig, +}; +use solana_rpc::rpc_pubsub::RpcSolPubSubClient; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio_stream::StreamMap; +use tracing::*; + +use crate::account_update_stream::{AccountUpdate, ChainSlotUpdate, Message}; +use crate::AnyhowWrap; + +pub struct Config { + pub rpc_ws_url: String, + pub rpc_http_url: String, + pub serum_programs: Vec, + pub open_orders_authority: Pubkey, + pub grpc_sources: Vec, +} + +async fn feed_data( + config: &Config, + mango_oracles: Vec, + sender: async_channel::Sender, + metrics: &Metrics, + exit: Arc, +) -> anyhow::Result<()> { + let connect = ws::try_connect::(&config.rpc_ws_url).map_err_anyhow()?; + let client = connect.await.map_err_anyhow()?; + + let source_config = SourceConfig { + dedup_queue_size: 5000, + grpc_sources: config.grpc_sources.clone(), + snapshot: SnapshotSourceConfig { + rpc_http_url: config.rpc_http_url.clone(), + }, + rpc_ws_url: config.rpc_ws_url.clone(), + }; + + let serum3_oo_custom_filters = vec![ + FeedFilterType::DataSize(3228), // open orders size + // "serum" + u64 that is Initialized (1) + OpenOrders (4) + FeedFilterType::Memcmp(Memcmp { + // new_base58_encoded() does not work with old RPC nodes + offset: 0, + bytes: [0x73, 0x65, 0x72, 0x75, 0x6d, 5, 0, 0, 0, 0, 0, 0, 0].to_vec(), + }), + FeedFilterType::Memcmp(Memcmp { + offset: 45, + bytes: config.open_orders_authority.to_bytes().to_vec(), + }), + ]; + + let mut all_jobs = vec![]; + + let mango_filters = FilterConfig { + entity_filter: EntityFilter::FilterByProgramId(mango_v4::id()), + }; + let (mango_sub_sender, mango_sub_receiver) = async_channel::unbounded(); + let (mango_sub_slot_sender, mango_sub_slot_receiver) = async_channel::unbounded(); + let mango_sub_job = mango_feeds_connector::grpc_plugin_source::process_events( + source_config.clone(), + mango_filters, + mango_sub_sender, + mango_sub_slot_sender, + metrics.clone(), + exit.clone(), + ); + all_jobs.push(tokio::spawn(mango_sub_job)); + + let mango_oracles_filters = FilterConfig { + entity_filter: EntityFilter::FilterByAccountIds(mango_oracles), + }; + let (mango_oracle_sender, mango_oracle_receiver) = async_channel::unbounded(); + let (mango_oracle_slot_sender, mango_oracle_slot_receiver) = async_channel::unbounded(); + let mango_oracle_job = mango_feeds_connector::grpc_plugin_source::process_events( + source_config.clone(), + mango_oracles_filters, + mango_oracle_sender, + mango_oracle_slot_sender, + metrics.clone(), + exit.clone(), + ); + all_jobs.push(tokio::spawn(mango_oracle_job)); + + let mut serum3_oo_sub_map = StreamMap::new(); + let mut serum3_oo_slot_sub_map = StreamMap::new(); + for serum_program in config.serum_programs.iter() { + let (serum3_oo_sender, serum3_oo_receiver) = async_channel::unbounded(); + let (serum3_oo_slot_sender, serum3_oo_slot_receiver) = async_channel::unbounded(); + let filters = FilterConfig { + entity_filter: EntityFilter::FilterByProgramIdSelective( + *serum_program, + serum3_oo_custom_filters.clone(), + ), + }; + + let serum3_job = mango_feeds_connector::grpc_plugin_source::process_events( + source_config.clone(), + filters, + serum3_oo_sender, + serum3_oo_slot_sender, + metrics.clone(), + exit.clone(), + ); + + all_jobs.push(tokio::spawn(serum3_job)); + serum3_oo_sub_map.insert(*serum_program, serum3_oo_receiver); + serum3_oo_slot_sub_map.insert(*serum_program, serum3_oo_slot_receiver); + } + + // Make sure the serum3_oo_sub_map does not exit when there's no serum_programs + let _unused_serum_sender; + if config.serum_programs.is_empty() { + let (sender, receiver) = async_channel::unbounded::(); + _unused_serum_sender = sender; + serum3_oo_sub_map.insert(Pubkey::default(), receiver); + } + + let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?; + + let mut jobs: futures::stream::FuturesUnordered<_> = all_jobs.into_iter().collect(); + + loop { + tokio::select! { + _ = jobs.next() => {}, + _ = mango_sub_slot_receiver.recv() => {}, + _ = mango_oracle_slot_receiver.recv() => {}, + _ = serum3_oo_slot_sub_map.next() => {}, + message = mango_sub_receiver.recv() => if !handle_message("mango", message, &sender).await { return Ok(()); }, + message = mango_oracle_receiver.recv() => if !handle_message("oracles", message, &sender).await { return Ok(()); }, + message = serum3_oo_sub_map.next() => { + if let Some((_, data)) = message { + handle_feed_write(&sender, data).await; + } else { + warn!("serum stream closed"); + return Ok(()); + } + }, + message = slot_sub.next() => { + if let Some(data) = message { + sender.send(Message::Slot(ChainSlotUpdate{ + slot_update: data.map_err_anyhow()?, + reception_time: Instant::now() + })).await.expect("sending must succeed"); + } else { + warn!("slot update stream closed"); + return Ok(()); + } + }, + _ = tokio::time::sleep(Duration::from_secs(60)) => { + warn!("grpc timeout"); + return Ok(()) + } + } + } +} + +async fn handle_message( + name: &str, + message: Result, + sender: &Sender, +) -> bool { + if let Ok(data) = message { + handle_feed_write(sender, data).await; + true + } else { + warn!("{} stream closed", name); + false + } +} + +async fn handle_feed_write(sender: &Sender, account: AccountWrite) { + sender + .send(Message::Account(AccountUpdate::from_feed(account))) + .await + .expect("sending must succeed"); +} + +pub fn start( + config: Config, + mango_oracles: Vec, + sender: async_channel::Sender, + metrics: Metrics, + exit: Arc, +) { + tokio::spawn(async move { + // if the grpc disconnects, we get no data in a while etc, reconnect and try again + loop { + info!("connecting to solana grpc streams"); + let out = feed_data( + &config, + mango_oracles.clone(), + sender.clone(), + &metrics, + exit.clone(), + ); + let result = out.await; + if let Err(err) = result { + warn!("grpc stream error: {err}"); + } + } + }); +} + +pub async fn get_next_create_bank_slot( + receiver: async_channel::Receiver, + timeout: Duration, +) -> anyhow::Result { + let start = std::time::Instant::now(); + loop { + let elapsed = start.elapsed(); + if elapsed > timeout { + anyhow::bail!( + "did not receive a slot from the grpc connection in {}s", + timeout.as_secs() + ); + } + let remaining_timeout = timeout - elapsed; + + let msg = match tokio::time::timeout(remaining_timeout, receiver.recv()).await { + // timeout + Err(_) => continue, + // channel close + Ok(Err(err)) => { + return Err(err).context("while waiting for first slot from grpc connection"); + } + // success + Ok(Ok(msg)) => msg, + }; + + match msg { + Message::Slot(slot_update) => { + if let solana_client::rpc_response::SlotUpdate::CreatedBank { slot, .. } = + *slot_update.slot_update + { + return Ok(slot); + } + } + _ => {} + } + } +} diff --git a/lib/client/src/jupiter/v6.rs b/lib/client/src/jupiter/v6.rs index 6c73fc7417..ff92af5a09 100644 --- a/lib/client/src/jupiter/v6.rs +++ b/lib/client/src/jupiter/v6.rs @@ -1,4 +1,5 @@ use std::str::FromStr; +use std::time::Duration; use anchor_lang::prelude::Pubkey; use serde::{Deserialize, Serialize}; @@ -139,6 +140,7 @@ impl TryFrom<&AccountMeta> for solana_sdk::instruction::AccountMeta { pub struct JupiterV6<'a> { pub mango_client: &'a MangoClient, + pub timeout_duration: Duration, } impl<'a> JupiterV6<'a> { @@ -204,6 +206,7 @@ impl<'a> JupiterV6<'a> { .http_client .get(format!("{}/quote", config.jupiter_v6_url)) .query(&query_args) + .timeout(self.timeout_duration) .send() .await .context("quote request to jupiter")?; @@ -290,6 +293,7 @@ impl<'a> JupiterV6<'a> { destination_token_account: None, // default to user ata quote_response: quote.clone(), }) + .timeout(self.timeout_duration) .send() .await .context("swap transaction request to jupiter")?; diff --git a/lib/client/src/lib.rs b/lib/client/src/lib.rs index 882a931f68..a5c5c4ff8b 100644 --- a/lib/client/src/lib.rs +++ b/lib/client/src/lib.rs @@ -12,6 +12,7 @@ pub mod confirm_transaction; mod context; pub mod error_tracking; pub mod gpa; +pub mod grpc_source; pub mod health_cache; pub mod jupiter; pub mod perp_pnl; diff --git a/lib/client/src/snapshot_source.rs b/lib/client/src/snapshot_source.rs index 44da48303d..05535fe125 100644 --- a/lib/client/src/snapshot_source.rs +++ b/lib/client/src/snapshot_source.rs @@ -1,25 +1,23 @@ +pub use crate::chain_data_fetcher::AccountFetcher; +pub use mango_feeds_connector::snapshot::*; + use jsonrpc_core_client::transports::http; use mango_v4::accounts_zerocopy::*; use mango_v4::state::{MangoAccountFixed, MangoAccountLoadedRef}; -use solana_account_decoder::{UiAccount, UiAccountEncoding}; -use solana_client::{ - rpc_config::{RpcAccountInfoConfig, RpcContextConfig, RpcProgramAccountsConfig}, - rpc_response::{OptionalContext, Response, RpcKeyedAccount}, -}; +use solana_client::rpc_config::RpcContextConfig; use solana_rpc::rpc::rpc_minimal::MinimalClient; use solana_sdk::{account::AccountSharedData, commitment_config::CommitmentConfig, pubkey::Pubkey}; use anyhow::Context; use futures::{stream, StreamExt}; -use solana_rpc::rpc::rpc_accounts::AccountsDataClient; -use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient; use std::str::FromStr; use std::time::{Duration, Instant}; +use tokio::task::JoinHandle; use tokio::time; use tracing::*; -use crate::account_update_stream::{AccountUpdate, Message}; +use crate::account_update_stream::{AccountUpdate, Message, SnapshotType}; use crate::AnyhowWrap; pub fn is_mango_account<'a>( @@ -42,14 +40,11 @@ struct AccountSnapshot { } impl AccountSnapshot { - pub fn extend_from_gpa_rpc( - &mut self, - rpc: Response>, - ) -> anyhow::Result<()> { - self.accounts.reserve(rpc.value.len()); - for a in rpc.value { + pub fn extend_from_gpa_rpc(&mut self, rpc: SnapshotProgramAccounts) -> anyhow::Result<()> { + self.accounts.reserve(rpc.accounts.len()); + for a in rpc.accounts { self.accounts.push(AccountUpdate { - slot: rpc.context.slot, + slot: rpc.slot, pubkey: Pubkey::from_str(&a.pubkey).unwrap(), account: a .account @@ -61,17 +56,13 @@ impl AccountSnapshot { Ok(()) } - pub fn extend_from_gma_rpc( - &mut self, - keys: &[Pubkey], - rpc: Response>>, - ) -> anyhow::Result<()> { - self.accounts.reserve(rpc.value.len()); - for (&pubkey, a) in keys.iter().zip(rpc.value.iter()) { + pub fn extend_from_gma_rpc(&mut self, rpc: SnapshotMultipleAccounts) -> anyhow::Result<()> { + self.accounts.reserve(rpc.accounts.len()); + for (key, a) in rpc.accounts.iter() { if let Some(ui_account) = a { self.accounts.push(AccountUpdate { - slot: rpc.context.slot, - pubkey, + slot: rpc.slot, + pubkey: Pubkey::from_str(key)?, account: ui_account .decode() .ok_or_else(|| anyhow::anyhow!("could not decode account"))?, @@ -97,75 +88,29 @@ async fn feed_snapshots( mango_oracles: Vec, sender: &async_channel::Sender, ) -> anyhow::Result<()> { - // TODO replace the following with mango-feeds connector's snapshot.rs - - // note: with solana 1.15 the gPA (get_program_accounts) rpc call was moved to a new mod rpc_client_scan - let rpc_client_data = - http::connect_with_options::(&config.rpc_http_url, true) - .await - .map_err_anyhow()?; - - let rpc_client_scan = - http::connect_with_options::(&config.rpc_http_url, true) - .await - .map_err_anyhow()?; - - let account_info_config = RpcAccountInfoConfig { - encoding: Some(UiAccountEncoding::Base64), - commitment: Some(CommitmentConfig::finalized()), - data_slice: None, - min_context_slot: Some(config.min_slot), - }; - let all_accounts_config = RpcProgramAccountsConfig { - filters: None, - with_context: Some(true), - account_config: account_info_config.clone(), - }; - // TODO: This way the snapshots are done sequentially, and a failing snapshot prohibits the second one to be attempted let mut snapshot = AccountSnapshot::default(); // Get all accounts of the mango program - let response = rpc_client_scan - .get_program_accounts( - mango_v4::id().to_string(), - Some(all_accounts_config.clone()), - ) + let response = get_snapshot_gpa(config.rpc_http_url.to_string(), mango_v4::id().to_string()) .await .map_err_anyhow() .context("error during getProgamAccounts for mango program")?; - if let OptionalContext::Context(account_snapshot_response) = response { - snapshot.extend_from_gpa_rpc(account_snapshot_response)?; - } else { - anyhow::bail!("did not receive context"); - } + snapshot.extend_from_gpa_rpc(response)?; // Get all the pyth oracles referred to by mango banks - let results: Vec<( - Vec, - Result>>, jsonrpc_core_client::RpcError>, - )> = stream::iter(mango_oracles) + let results: Vec> = stream::iter(mango_oracles) .chunks(config.get_multiple_accounts_count) - .map(|keys| { - let rpc_client = &rpc_client_data; - let account_info_config = account_info_config.clone(); - async move { - let string_keys = keys.iter().map(|k| k.to_string()).collect::>(); - ( - keys, - rpc_client - .get_multiple_accounts(string_keys, Some(account_info_config)) - .await, - ) - } + .map(|keys| async move { + let string_keys = keys.iter().map(|k| k.to_string()).collect::>(); + get_snapshot_gma(config.rpc_http_url.to_string(), string_keys).await }) .buffer_unordered(config.parallel_rpc_requests) .collect::>() .await; - for (keys, result) in results { + for result in results { snapshot.extend_from_gma_rpc( - &keys, result .map_err_anyhow() .context("error during getMultipleAccounts for Pyth Oracles")?, @@ -186,30 +131,17 @@ async fn feed_snapshots( .collect::>(); // Retrieve all the open orders accounts - let results: Vec<( - Vec, - Result>>, jsonrpc_core_client::RpcError>, - )> = stream::iter(oo_account_pubkeys) + let results: Vec> = stream::iter(oo_account_pubkeys) .chunks(config.get_multiple_accounts_count) - .map(|keys| { - let rpc_client = &rpc_client_data; - let account_info_config = account_info_config.clone(); - async move { - let string_keys = keys.iter().map(|k| k.to_string()).collect::>(); - ( - keys, - rpc_client - .get_multiple_accounts(string_keys, Some(account_info_config)) - .await, - ) - } + .map(|keys| async move { + let string_keys = keys.iter().map(|k| k.to_string()).collect::>(); + get_snapshot_gma(config.rpc_http_url.to_string(), string_keys).await }) .buffer_unordered(config.parallel_rpc_requests) .collect::>() .await; - for (keys, result) in results { + for result in results { snapshot.extend_from_gma_rpc( - &keys, result .map_err_anyhow() .context("error during getMultipleAccounts for OpenOrders accounts")?, @@ -217,17 +149,21 @@ async fn feed_snapshots( } sender - .send(Message::Snapshot(snapshot.accounts)) + .send(Message::Snapshot(snapshot.accounts, SnapshotType::Full)) .await .expect("sending must succeed"); Ok(()) } -pub fn start(config: Config, mango_oracles: Vec, sender: async_channel::Sender) { +pub fn start( + config: Config, + mango_oracles: Vec, + sender: async_channel::Sender, +) -> JoinHandle<()> { let mut poll_wait_first_snapshot = crate::delay_interval(time::Duration::from_secs(2)); let mut interval_between_snapshots = crate::delay_interval(config.snapshot_interval); - tokio::spawn(async move { + let snapshot_job = tokio::spawn(async move { let rpc_client = http::connect_with_options::(&config.rpc_http_url, true) .await .expect("always Ok"); @@ -260,4 +196,6 @@ pub fn start(config: Config, mango_oracles: Vec, sender: async_channel:: }; } }); + + snapshot_job } diff --git a/lib/client/src/util.rs b/lib/client/src/util.rs index f54d6cac9f..cd562e33c8 100644 --- a/lib/client/src/util.rs +++ b/lib/client/src/util.rs @@ -20,19 +20,29 @@ impl AnyhowWrap for Result { /// Push to an async_channel::Sender and ignore if the channel is full pub trait AsyncChannelSendUnlessFull { /// Send a message if the channel isn't full - fn send_unless_full(&self, msg: T) -> Result<(), async_channel::SendError>; + fn send_unless_full(&self, msg: T) -> anyhow::Result<()>; } impl AsyncChannelSendUnlessFull for async_channel::Sender { - fn send_unless_full(&self, msg: T) -> Result<(), async_channel::SendError> { + fn send_unless_full(&self, msg: T) -> anyhow::Result<()> { use async_channel::*; match self.try_send(msg) { Ok(()) => Ok(()), - Err(TrySendError::Closed(msg)) => Err(async_channel::SendError(msg)), + Err(TrySendError::Closed(_)) => Err(anyhow::format_err!("channel is closed")), Err(TrySendError::Full(_)) => Ok(()), } } } +impl AsyncChannelSendUnlessFull for tokio::sync::mpsc::Sender { + fn send_unless_full(&self, msg: T) -> anyhow::Result<()> { + use tokio::sync::mpsc::*; + match self.try_send(msg) { + Ok(()) => Ok(()), + Err(error::TrySendError::Closed(_)) => Err(anyhow::format_err!("channel is closed")), + Err(error::TrySendError::Full(_)) => Ok(()), + } + } +} /// Like tokio::time::interval(), but with Delay as default MissedTickBehavior /// diff --git a/lib/client/src/websocket_source.rs b/lib/client/src/websocket_source.rs index c3d9b89e75..68749b2209 100644 --- a/lib/client/src/websocket_source.rs +++ b/lib/client/src/websocket_source.rs @@ -1,16 +1,15 @@ use jsonrpc_core::futures::StreamExt; use jsonrpc_core_client::transports::ws; -use solana_account_decoder::UiAccountEncoding; -use solana_client::{ - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, - rpc_filter::{Memcmp, RpcFilterType}, - rpc_response::{RpcKeyedAccount, RpcResponseContext}, -}; -use solana_rpc::rpc_pubsub::RpcSolPubSubClient; -use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; +use solana_sdk::pubkey::Pubkey; use anyhow::Context; +use async_channel::{RecvError, Sender}; +use mango_feeds_connector::{ + AccountWrite, EntityFilter, FeedFilterType, FilterConfig, Memcmp, SnapshotSourceConfig, + SourceConfig, +}; +use solana_rpc::rpc_pubsub::RpcSolPubSubClient; use std::time::{Duration, Instant}; use tokio_stream::StreamMap; use tracing::*; @@ -20,6 +19,7 @@ use crate::AnyhowWrap; pub struct Config { pub rpc_ws_url: String, + pub rpc_http_url: String, pub serum_programs: Vec, pub open_orders_authority: Pubkey, } @@ -32,110 +32,104 @@ async fn feed_data( let connect = ws::try_connect::(&config.rpc_ws_url).map_err_anyhow()?; let client = connect.await.map_err_anyhow()?; - let account_info_config = RpcAccountInfoConfig { - encoding: Some(UiAccountEncoding::Base64), - commitment: Some(CommitmentConfig::processed()), - data_slice: None, - min_context_slot: None, + let source_config = SourceConfig { + dedup_queue_size: 0, + grpc_sources: vec![], + snapshot: SnapshotSourceConfig { + rpc_http_url: config.rpc_http_url.clone(), + }, + rpc_ws_url: config.rpc_ws_url.clone(), }; - let all_accounts_config = RpcProgramAccountsConfig { - filters: None, - with_context: Some(true), - account_config: account_info_config.clone(), + + let serum3_oo_custom_filters = vec![ + FeedFilterType::DataSize(3228), // open orders size + // "serum" + u64 that is Initialized (1) + OpenOrders (4) + FeedFilterType::Memcmp(Memcmp { + // new_base58_encoded() does not work with old RPC nodes + offset: 0, + bytes: [0x73, 0x65, 0x72, 0x75, 0x6d, 5, 0, 0, 0, 0, 0, 0, 0].to_vec(), + }), + FeedFilterType::Memcmp(Memcmp { + offset: 45, + bytes: config.open_orders_authority.to_bytes().to_vec(), + }), + ]; + + let mut all_jobs = vec![]; + + let mango_filters = FilterConfig { + entity_filter: EntityFilter::FilterByProgramId(mango_v4::id()), }; - let open_orders_accounts_config = RpcProgramAccountsConfig { - // filter for only OpenOrders with v4 authority - filters: Some(vec![ - RpcFilterType::DataSize(3228), // open orders size - // "serum" + u64 that is Initialized (1) + OpenOrders (4) - RpcFilterType::Memcmp(Memcmp::new_raw_bytes( - // new_base58_encoded() does not work with old RPC nodes - 0, - [0x73, 0x65, 0x72, 0x75, 0x6d, 5, 0, 0, 0, 0, 0, 0, 0].to_vec(), - )), - RpcFilterType::Memcmp(Memcmp::new_raw_bytes( - 45, - config.open_orders_authority.to_bytes().to_vec(), - )), - ]), - with_context: Some(true), - account_config: account_info_config.clone(), + let (mango_sub_sender, mango_sub_receiver) = async_channel::unbounded(); + let (mango_sub_slot_sender, mango_sub_slot_receiver) = async_channel::unbounded(); + let mango_sub_job = mango_feeds_connector::websocket_source::process_events( + source_config.clone(), + mango_filters, + mango_sub_sender, + mango_sub_slot_sender, + ); + all_jobs.push(tokio::spawn(mango_sub_job)); + + let mango_oracles_filters = FilterConfig { + entity_filter: EntityFilter::FilterByAccountIds(mango_oracles), }; - let mut mango_sub = client - .program_subscribe( - mango_v4::id().to_string(), - Some(all_accounts_config.clone()), - ) - .map_err_anyhow()?; - - let mut mango_oracles_sub_map = StreamMap::new(); - for oracle in mango_oracles.into_iter() { - mango_oracles_sub_map.insert( - oracle, - client - .account_subscribe( - oracle.to_string(), - Some(RpcAccountInfoConfig { - encoding: Some(UiAccountEncoding::Base64), - commitment: Some(CommitmentConfig::processed()), - data_slice: None, - min_context_slot: None, - }), - ) - .map_err_anyhow()?, - ); - } + let (mango_oracle_sender, mango_oracle_receiver) = async_channel::unbounded(); + let (mango_oracle_slot_sender, mango_oracle_slot_receiver) = async_channel::unbounded(); + let mango_oracle_job = mango_feeds_connector::websocket_source::process_events( + source_config.clone(), + mango_oracles_filters, + mango_oracle_sender, + mango_oracle_slot_sender, + ); + all_jobs.push(tokio::spawn(mango_oracle_job)); let mut serum3_oo_sub_map = StreamMap::new(); + let mut serum3_oo_slot_sub_map = StreamMap::new(); for serum_program in config.serum_programs.iter() { - serum3_oo_sub_map.insert( - *serum_program, - client - .program_subscribe( - serum_program.to_string(), - Some(open_orders_accounts_config.clone()), - ) - .map_err_anyhow()?, + let (serum3_oo_sender, serum3_oo_receiver) = async_channel::unbounded(); + let (serum3_oo_slot_sender, serum3_oo_slot_receiver) = async_channel::unbounded(); + let filters = FilterConfig { + entity_filter: EntityFilter::FilterByProgramIdSelective( + *serum_program, + serum3_oo_custom_filters.clone(), + ), + }; + + let serum3_job = mango_feeds_connector::websocket_source::process_events( + source_config.clone(), + filters, + serum3_oo_sender, + serum3_oo_slot_sender, ); + + all_jobs.push(tokio::spawn(serum3_job)); + serum3_oo_sub_map.insert(*serum_program, serum3_oo_receiver); + serum3_oo_slot_sub_map.insert(*serum_program, serum3_oo_slot_receiver); } + // Make sure the serum3_oo_sub_map does not exit when there's no serum_programs let _unused_serum_sender; if config.serum_programs.is_empty() { - let (sender, receiver) = jsonrpc_core::futures::channel::mpsc::unbounded(); + let (sender, receiver) = async_channel::unbounded::(); _unused_serum_sender = sender; - serum3_oo_sub_map.insert( - Pubkey::default(), - jsonrpc_core_client::TypedSubscriptionStream::new(receiver, "foo"), - ); + serum3_oo_sub_map.insert(Pubkey::default(), receiver); } let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?; + let mut jobs: futures::stream::FuturesUnordered<_> = all_jobs.into_iter().collect(); + loop { tokio::select! { - message = mango_sub.next() => { - if let Some(data) = message { - let response = data.map_err_anyhow()?; - sender.send(Message::Account(AccountUpdate::from_rpc(response)?)).await.expect("sending must succeed"); - } else { - warn!("mango stream closed"); - return Ok(()); - } - }, - message = mango_oracles_sub_map.next() => { - if let Some(data) = message { - let response = data.1.map_err_anyhow()?; - let response = solana_client::rpc_response::Response{ context: RpcResponseContext{ slot: response.context.slot, api_version: None }, value: RpcKeyedAccount{ pubkey: data.0.to_string(), account: response.value} } ; - sender.send(Message::Account(AccountUpdate::from_rpc(response)?)).await.expect("sending must succeed"); - } else { - warn!("oracle stream closed"); - return Ok(()); - } - }, + _ = jobs.next() => {}, + _ = mango_sub_slot_receiver.recv() => {}, + _ = mango_oracle_slot_receiver.recv() => {}, + _ = serum3_oo_slot_sub_map.next() => {}, + message = mango_sub_receiver.recv() => if !handle_message("mango", message, &sender).await { return Ok(()); }, + message = mango_oracle_receiver.recv() => if !handle_message("oracles", message, &sender).await { return Ok(()); }, message = serum3_oo_sub_map.next() => { - if let Some(data) = message { - let response = data.1.map_err_anyhow()?; - sender.send(Message::Account(AccountUpdate::from_rpc(response)?)).await.expect("sending must succeed"); + if let Some((_, data)) = message { + handle_feed_write(&sender, data).await; } else { warn!("serum stream closed"); return Ok(()); @@ -160,6 +154,27 @@ async fn feed_data( } } +async fn handle_message( + name: &str, + message: Result, + sender: &Sender, +) -> bool { + if let Ok(data) = message { + handle_feed_write(sender, data).await; + true + } else { + warn!("{} stream closed", name); + false + } +} + +async fn handle_feed_write(sender: &Sender, account: AccountWrite) { + sender + .send(Message::Account(AccountUpdate::from_feed(account))) + .await + .expect("sending must succeed"); +} + pub fn start(config: Config, mango_oracles: Vec, sender: async_channel::Sender) { tokio::spawn(async move { // if the websocket disconnects, we get no data in a while etc, reconnect and try again diff --git a/programs/mango-v4/tests/cases/test_health_check.rs b/programs/mango-v4/tests/cases/test_health_check.rs index b4624218b6..bb0c792a3a 100644 --- a/programs/mango-v4/tests/cases/test_health_check.rs +++ b/programs/mango-v4/tests/cases/test_health_check.rs @@ -7,8 +7,6 @@ use mango_v4::accounts_ix::{HealthCheck, HealthCheckKind}; use mango_v4::error::MangoError; use solana_sdk::transport::TransportError; -// TODO FAS - #[tokio::test] async fn test_health_check() -> Result<(), TransportError> { let context = TestContext::new().await; diff --git a/ts/client/scripts/liqtest/README.md b/ts/client/scripts/liqtest/README.md index 847fcfdaff..f1889404d3 100644 --- a/ts/client/scripts/liqtest/README.md +++ b/ts/client/scripts/liqtest/README.md @@ -51,6 +51,7 @@ This creates a bunch of to-be-liquidated accounts as well as a LIQOR account. Run the liquidator on the group with the liqor account. Since devnet doesn't have any jupiter, run with + ``` JUPITER_VERSION=mock TCS_MODE=borrow-buy