diff --git a/examples/live/binance_spot_market_maker.py b/examples/live/binance_spot_market_maker.py index 87f95a233387..57be0419f4eb 100644 --- a/examples/live/binance_spot_market_maker.py +++ b/examples/live/binance_spot_market_maker.py @@ -21,7 +21,6 @@ from nautilus_trader.adapters.binance.config import BinanceExecClientConfig from nautilus_trader.adapters.binance.factories import BinanceLiveDataClientFactory from nautilus_trader.adapters.binance.factories import BinanceLiveExecClientFactory -from nautilus_trader.config import CacheDatabaseConfig from nautilus_trader.config import InstrumentProviderConfig from nautilus_trader.config import LiveExecEngineConfig from nautilus_trader.config import LoggingConfig @@ -45,7 +44,10 @@ reconciliation=True, reconciliation_lookback_mins=1440, ), - cache_database=CacheDatabaseConfig(type="redis"), + # cache_database=CacheDatabaseConfig( + # type="redis", + # buffer_interval_ms=100, + # ), # message_bus=MessageBusConfig( # database=DatabaseConfig(), # encoding="json", diff --git a/nautilus_core/infrastructure/src/cache.rs b/nautilus_core/infrastructure/src/cache.rs index afd77cf3e292..8825a1a1b531 100644 --- a/nautilus_core/infrastructure/src/cache.rs +++ b/nautilus_core/infrastructure/src/cache.rs @@ -69,7 +69,7 @@ pub trait CacheDatabase { fn insert(&mut self, key: String, payload: Option>>) -> Result<()>; fn update(&mut self, key: String, payload: Option>>) -> Result<()>; fn delete(&mut self, key: String, payload: Option>>) -> Result<()>; - fn handle_ops( + fn handle_messages( rx: Receiver, trader_key: String, config: HashMap, diff --git a/nautilus_core/infrastructure/src/redis.rs b/nautilus_core/infrastructure/src/redis.rs index 9bf6a9a8411e..93118f1ec6c4 100644 --- a/nautilus_core/infrastructure/src/redis.rs +++ b/nautilus_core/infrastructure/src/redis.rs @@ -14,16 +14,17 @@ // ------------------------------------------------------------------------------------------------- use std::{ - collections::HashMap, - sync::mpsc::{channel, Receiver, Sender}, + collections::{HashMap, VecDeque}, + sync::mpsc::{channel, Receiver, Sender, TryRecvError}, thread, + time::{Duration, Instant}, }; use anyhow::{anyhow, bail, Result}; use nautilus_core::uuid::UUID4; use nautilus_model::identifiers::trader_id::TraderId; use pyo3::prelude::*; -use redis::{Commands, Connection}; +use redis::{Commands, Connection, Pipeline}; use serde_json::{json, Value}; use crate::cache::{CacheDatabase, DatabaseCommand, DatabaseOperation}; @@ -90,7 +91,7 @@ impl CacheDatabase for RedisCacheDatabase { let trader_key_clone = trader_key.clone(); thread::spawn(move || { - Self::handle_ops(rx, trader_key_clone, config); + Self::handle_messages(rx, trader_key_clone, config); }); Ok(RedisCacheDatabase { @@ -158,7 +159,7 @@ impl CacheDatabase for RedisCacheDatabase { } } - fn handle_ops( + fn handle_messages( rx: Receiver, trader_key: String, config: HashMap, @@ -167,37 +168,65 @@ impl CacheDatabase for RedisCacheDatabase { let client = redis::Client::open(redis_url).unwrap(); let mut conn = client.get_connection().unwrap(); - // Continue to receive and handle bus messages until channel is hung up - while let Ok(msg) = rx.recv() { - let collection = match get_collection_key(&msg.key) { - Ok(collection) => collection, - Err(e) => { - eprintln!("{e}"); - continue; // Continue to next message + // Buffering machinery + let mut buffer: VecDeque = VecDeque::new(); + let mut last_drain = Instant::now(); + let recv_interval = Duration::from_millis(1); + let buffer_interval = get_buffer_interval(&config); + + loop { + if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() { + drain_buffer(&mut conn, &trader_key, &mut buffer); + last_drain = Instant::now(); + } else { + // Continue to receive and handle messages until channel is hung up + match rx.try_recv() { + Ok(msg) => buffer.push_back(msg), + Err(TryRecvError::Empty) => thread::sleep(recv_interval), + Err(TryRecvError::Disconnected) => return, // Channel hung up } - }; + } + } + } +} + +fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque) { + let mut pipe = redis::pipe(); + pipe.atomic(); - let key = format!("{trader_key}{DELIMITER}{}", msg.key); + for msg in buffer.drain(..) { + let collection = match get_collection_key(&msg.key) { + Ok(collection) => collection, + Err(e) => { + eprintln!("{e}"); + continue; // Continue to next message + } + }; - match msg.op_type { - DatabaseOperation::Insert => { - if let Err(e) = insert(&mut conn, collection, &key, msg.payload) { - eprintln!("{e}"); - } + let key = format!("{trader_key}{DELIMITER}{}", msg.key); + + match msg.op_type { + DatabaseOperation::Insert => { + if let Err(e) = insert(&mut pipe, collection, &key, msg.payload) { + eprintln!("{e}"); } - DatabaseOperation::Update => { - if let Err(e) = update(&mut conn, collection, &key, msg.payload) { - eprintln!("{e}"); - } + } + DatabaseOperation::Update => { + if let Err(e) = update(&mut pipe, collection, &key, msg.payload) { + eprintln!("{e}"); } - DatabaseOperation::Delete => { - if let Err(e) = delete(&mut conn, collection, &key, msg.payload) { - eprintln!("{e}"); - } + } + DatabaseOperation::Delete => { + if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) { + eprintln!("{e}"); } } } } + + if let Err(e) = pipe.query::<()>(conn) { + eprintln!("{e}"); + } } fn read_index(conn: &mut Connection, key: &str) -> Result>> { @@ -245,7 +274,7 @@ fn read_list(conn: &mut Connection, key: &str) -> Result>> { } fn insert( - conn: &mut Connection, + pipe: &mut Pipeline, collection: &str, key: &str, value: Option>>, @@ -256,62 +285,124 @@ fn insert( } match collection { - INDEX => insert_index(conn, key, &value), - GENERAL => insert_string(conn, key, &value[0]), - CURRENCIES => insert_string(conn, key, &value[0]), - INSTRUMENTS => insert_string(conn, key, &value[0]), - SYNTHETICS => insert_string(conn, key, &value[0]), - ACCOUNTS => insert_list(conn, key, &value[0]), - ORDERS => insert_list(conn, key, &value[0]), - POSITIONS => insert_list(conn, key, &value[0]), - ACTORS => insert_string(conn, key, &value[0]), - STRATEGIES => insert_string(conn, key, &value[0]), - SNAPSHOTS => insert_list(conn, key, &value[0]), - HEALTH => insert_string(conn, key, &value[0]), + INDEX => insert_index(pipe, key, &value), + GENERAL => { + insert_string(pipe, key, &value[0]); + Ok(()) + } + CURRENCIES => { + insert_string(pipe, key, &value[0]); + Ok(()) + } + INSTRUMENTS => { + insert_string(pipe, key, &value[0]); + Ok(()) + } + SYNTHETICS => { + insert_string(pipe, key, &value[0]); + Ok(()) + } + ACCOUNTS => { + insert_list(pipe, key, &value[0]); + Ok(()) + } + ORDERS => { + insert_list(pipe, key, &value[0]); + Ok(()) + } + POSITIONS => { + insert_list(pipe, key, &value[0]); + Ok(()) + } + ACTORS => { + insert_string(pipe, key, &value[0]); + Ok(()) + } + STRATEGIES => { + insert_string(pipe, key, &value[0]); + Ok(()) + } + SNAPSHOTS => { + insert_list(pipe, key, &value[0]); + Ok(()) + } + HEALTH => { + insert_string(pipe, key, &value[0]); + Ok(()) + } _ => bail!("Unsupported operation: `insert` for collection '{collection}'"), } } -fn insert_index(conn: &mut Connection, key: &str, value: &[Vec]) -> Result<()> { +fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Vec]) -> Result<()> { let index_key = get_index_key(key)?; match index_key { - INDEX_ORDER_IDS => insert_set(conn, key, &value[0]), - INDEX_ORDER_POSITION => insert_hset(conn, key, &value[0], &value[1]), - INDEX_ORDER_CLIENT => insert_hset(conn, key, &value[0], &value[1]), - INDEX_ORDERS => insert_set(conn, key, &value[0]), - INDEX_ORDERS_OPEN => insert_set(conn, key, &value[0]), - INDEX_ORDERS_CLOSED => insert_set(conn, key, &value[0]), - INDEX_ORDERS_EMULATED => insert_set(conn, key, &value[0]), - INDEX_ORDERS_INFLIGHT => insert_set(conn, key, &value[0]), - INDEX_POSITIONS => insert_set(conn, key, &value[0]), - INDEX_POSITIONS_OPEN => insert_set(conn, key, &value[0]), - INDEX_POSITIONS_CLOSED => insert_set(conn, key, &value[0]), + INDEX_ORDER_IDS => { + insert_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_ORDER_POSITION => { + insert_hset(pipe, key, &value[0], &value[1]); + Ok(()) + } + INDEX_ORDER_CLIENT => { + insert_hset(pipe, key, &value[0], &value[1]); + Ok(()) + } + INDEX_ORDERS => { + insert_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_ORDERS_OPEN => { + insert_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_ORDERS_CLOSED => { + insert_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_ORDERS_EMULATED => { + insert_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_ORDERS_INFLIGHT => { + insert_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_POSITIONS => { + insert_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_POSITIONS_OPEN => { + insert_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_POSITIONS_CLOSED => { + insert_set(pipe, key, &value[0]); + Ok(()) + } _ => bail!("Index unknown '{index_key}' on insert"), } } -fn insert_string(conn: &mut Connection, key: &str, value: &Vec) -> Result<()> { - conn.set(key, value) - .map_err(|e| anyhow!("Failed to set '{key}' in Redis: {e}")) +fn insert_string(pipe: &mut Pipeline, key: &str, value: &Vec) { + pipe.set(key, value); } -fn insert_set(conn: &mut Connection, key: &str, value: &Vec) -> Result<()> { - conn.sadd(key, value) - .map_err(|e| anyhow!("Failed to sadd '{key}' in Redis: {e}")) +fn insert_set(pipe: &mut Pipeline, key: &str, value: &Vec) { + pipe.sadd(key, value); } -fn insert_hset(conn: &mut Connection, key: &str, name: &Vec, value: &Vec) -> Result<()> { - conn.hset(key, name, value) - .map_err(|e| anyhow!("Failed to hset '{key}' in Redis: {e}")) +fn insert_hset(pipe: &mut Pipeline, key: &str, name: &Vec, value: &Vec) { + pipe.hset(key, name, value); } -fn insert_list(conn: &mut Connection, key: &str, value: &Vec) -> Result<()> { - conn.rpush(key, value) - .map_err(|e| anyhow!("Failed to rpush '{key}' in Redis: {e}")) +fn insert_list(pipe: &mut Pipeline, key: &str, value: &Vec) { + pipe.rpush(key, value); } fn update( - conn: &mut Connection, + pipe: &mut Pipeline, collection: &str, key: &str, value: Option>>, @@ -322,55 +413,85 @@ fn update( } match collection { - ACCOUNTS => update_list(conn, key, &value[0]), - ORDERS => update_list(conn, key, &value[0]), - POSITIONS => update_list(conn, key, &value[0]), + ACCOUNTS => { + update_list(pipe, key, &value[0]); + Ok(()) + } + ORDERS => { + update_list(pipe, key, &value[0]); + Ok(()) + } + POSITIONS => { + update_list(pipe, key, &value[0]); + Ok(()) + } _ => bail!("Unsupported operation: `update` for collection '{collection}'"), } } -fn update_list(conn: &mut Connection, key: &str, value: &Vec) -> Result<()> { - conn.rpush_exists(key, value) - .map_err(|e| anyhow!("Failed to rpush '{key}' in Redis: {e}")) +fn update_list(pipe: &mut Pipeline, key: &str, value: &Vec) { + pipe.rpush_exists(key, value); } fn delete( - conn: &mut Connection, + pipe: &mut Pipeline, collection: &str, key: &str, value: Option>>, ) -> Result<()> { match collection { - INDEX => remove_index(conn, key, value), - ACTORS => delete_string(conn, key), - STRATEGIES => delete_string(conn, key), + INDEX => remove_index(pipe, key, value), + ACTORS => { + delete_string(pipe, key); + Ok(()) + } + STRATEGIES => { + delete_string(pipe, key); + Ok(()) + } _ => bail!("Unsupported operation: `delete` for collection '{collection}'"), } } -fn remove_index(conn: &mut Connection, key: &str, value: Option>>) -> Result<()> { +fn remove_index(pipe: &mut Pipeline, key: &str, value: Option>>) -> Result<()> { let value = value.ok_or_else(|| anyhow!("Empty `payload` for `delete` '{key}'"))?; let index_key = get_index_key(key)?; match index_key { - INDEX_ORDERS_OPEN => remove_from_set(conn, key, &value[0]), - INDEX_ORDERS_CLOSED => remove_from_set(conn, key, &value[0]), - INDEX_ORDERS_EMULATED => remove_from_set(conn, key, &value[0]), - INDEX_ORDERS_INFLIGHT => remove_from_set(conn, key, &value[0]), - INDEX_POSITIONS_OPEN => remove_from_set(conn, key, &value[0]), - INDEX_POSITIONS_CLOSED => remove_from_set(conn, key, &value[0]), + INDEX_ORDERS_OPEN => { + remove_from_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_ORDERS_CLOSED => { + remove_from_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_ORDERS_EMULATED => { + remove_from_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_ORDERS_INFLIGHT => { + remove_from_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_POSITIONS_OPEN => { + remove_from_set(pipe, key, &value[0]); + Ok(()) + } + INDEX_POSITIONS_CLOSED => { + remove_from_set(pipe, key, &value[0]); + Ok(()) + } _ => bail!("Unsupported index operation: remove from '{index_key}'"), } } -fn remove_from_set(conn: &mut Connection, key: &str, member: &Vec) -> Result<()> { - conn.srem(key, member) - .map_err(|e| anyhow!("Failed to srem '{key}' in Redis: {e}")) +fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &Vec) { + pipe.srem(key, member); } -fn delete_string(conn: &mut Connection, key: &str) -> Result<()> { - conn.del(key) - .map_err(|e| anyhow!("Failed to del '{key}' in Redis: {e}")) +fn delete_string(pipe: &mut Pipeline, key: &str) { + pipe.del(key); } fn get_redis_url(config: &HashMap) -> String { @@ -400,6 +521,13 @@ fn get_redis_url(config: &HashMap) -> String { ) } +fn get_buffer_interval(config: &HashMap) -> Duration { + let buffer_interval_ms = config + .get("buffer_interval_ms") + .map(|v| v.as_u64().unwrap_or(10)); + Duration::from_millis(buffer_interval_ms.unwrap()) +} + fn get_trader_key( trader_id: TraderId, instance_id: UUID4, diff --git a/nautilus_trader/config/common.py b/nautilus_trader/config/common.py index cbaf2f5924fa..82f2141b68c5 100644 --- a/nautilus_trader/config/common.py +++ b/nautilus_trader/config/common.py @@ -145,6 +145,8 @@ class CacheDatabaseConfig(NautilusConfig, frozen=True): If database should use an SSL enabled connection. flush_on_start : bool, default False If database should be flushed on start. + buffer_interval_ms : PositiveInt, optional + The buffer interval (milliseconds) between pipelined/batched transactions. use_trader_prefix : bool, default True If a 'trader-' prefix is applied to keys. use_instance_id : bool, default False @@ -164,6 +166,7 @@ class CacheDatabaseConfig(NautilusConfig, frozen=True): password: str | None = None ssl: bool = False flush_on_start: bool = False + buffer_interval_ms: PositiveInt | None = None use_trader_prefix: bool = True use_instance_id: bool = False encoding: str = "msgpack"