From 2145c90ac83ce46dc6ab8d913ee42f0ebe6afd8b Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Sat, 23 Mar 2024 18:32:35 +1100 Subject: [PATCH] Join handle shutting down cache database --- nautilus_core/common/src/cache.rs | 1 + .../infrastructure/src/python/cache.rs | 5 +++ nautilus_core/infrastructure/src/redis.rs | 31 +++++++++++++------ nautilus_trader/cache/database.pyx | 6 +++- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/nautilus_core/common/src/cache.rs b/nautilus_core/common/src/cache.rs index 18893356eda7..b2d3d08c161b 100644 --- a/nautilus_core/common/src/cache.rs +++ b/nautilus_core/common/src/cache.rs @@ -83,6 +83,7 @@ pub trait CacheDatabase { instance_id: UUID4, config: HashMap, ) -> anyhow::Result; + fn shutdown(&mut self) -> anyhow::Result<()>; fn flushdb(&mut self) -> anyhow::Result<()>; fn keys(&mut self, pattern: &str) -> anyhow::Result>; fn read(&mut self, key: &str) -> anyhow::Result>>; diff --git a/nautilus_core/infrastructure/src/python/cache.rs b/nautilus_core/infrastructure/src/python/cache.rs index e8f86bcf622e..8f3f49a101db 100644 --- a/nautilus_core/infrastructure/src/python/cache.rs +++ b/nautilus_core/infrastructure/src/python/cache.rs @@ -38,6 +38,11 @@ impl RedisCacheDatabase { } } + #[pyo3(name = "shutdown")] + fn py_shutdown(&mut self) -> PyResult<()> { + self.shutdown().map_err(to_pyruntime_err) + } + #[pyo3(name = "flushdb")] fn py_flushdb(&mut self) -> PyResult<()> { match self.flushdb() { diff --git a/nautilus_core/infrastructure/src/redis.rs b/nautilus_core/infrastructure/src/redis.rs index e0a899a799fb..de9ae386c89b 100644 --- a/nautilus_core/infrastructure/src/redis.rs +++ b/nautilus_core/infrastructure/src/redis.rs @@ -16,7 +16,7 @@ use std::{ collections::{HashMap, VecDeque}, sync::mpsc::{channel, Receiver, Sender, TryRecvError}, - thread, + thread::{self, JoinHandle}, time::{Duration, Instant}, }; @@ -28,7 +28,7 @@ use nautilus_core::{correctness::check_slice_not_empty, uuid::UUID4}; use nautilus_model::identifiers::trader_id::TraderId; use redis::{Commands, Connection, Pipeline}; use serde_json::{json, Value}; -use tracing::debug; +use tracing::{debug, error}; // Error constants const FAILED_TX_CHANNEL: &str = "Failed to send to channel"; @@ -73,6 +73,7 @@ pub struct RedisCacheDatabase { trader_key: String, conn: Connection, tx: Sender, + handle: Option>, } impl CacheDatabase for RedisCacheDatabase { @@ -93,7 +94,7 @@ impl CacheDatabase for RedisCacheDatabase { let trader_key = get_trader_key(trader_id, instance_id, &config); let trader_key_clone = trader_key.clone(); - let _join_handle = thread::Builder::new() + let handle = thread::Builder::new() .name("cache".to_string()) .spawn(move || { Self::handle_messages(rx, trader_key_clone, config); @@ -105,9 +106,19 @@ impl CacheDatabase for RedisCacheDatabase { trader_key, conn, tx, + handle: Some(handle), }) } + fn shutdown(&mut self) -> anyhow::Result<()> { + debug!("Shutting down"); + if let Some(handle) = self.handle.take() { + handle.join().map_err(|e| anyhow::anyhow!("{:?}", e)) + } else { + Err(anyhow::anyhow!("Cache database already shutdown")) + } + } + fn flushdb(&mut self) -> anyhow::Result<()> { match redis::cmd(FLUSHDB).query::<()>(&mut self.conn) { Ok(_) => Ok(()), @@ -212,7 +223,7 @@ fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque collection, Err(e) => { - eprintln!("{e}"); + error!("{e}"); continue; // Continue to next message } }; @@ -222,7 +233,7 @@ fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque { if msg.payload.is_none() { - eprintln!("Null `payload` for `insert`"); + error!("Null `payload` for `insert`"); continue; // Continue to next message }; @@ -235,12 +246,12 @@ fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque>(); if let Err(e) = insert(&mut pipe, collection, &key, payload) { - eprintln!("{e}"); + error!("{e}"); } } DatabaseOperation::Update => { if msg.payload.is_none() { - eprintln!("Null `payload` for `update`"); + error!("Null `payload` for `update`"); continue; // Continue to next message }; @@ -253,7 +264,7 @@ fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque>(); if let Err(e) = update(&mut pipe, collection, &key, payload) { - eprintln!("{e}"); + error!("{e}"); } } DatabaseOperation::Delete => { @@ -264,14 +275,14 @@ fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque>()); if let Err(e) = delete(&mut pipe, collection, &key, payload) { - eprintln!("{e}"); + error!("{e}"); } } } } if let Err(e) = pipe.query::<()>(conn) { - eprintln!("{e}"); + error!("{e}"); } } diff --git a/nautilus_trader/cache/database.pyx b/nautilus_trader/cache/database.pyx index bed0e6c0cad5..94703e02f9b8 100644 --- a/nautilus_trader/cache/database.pyx +++ b/nautilus_trader/cache/database.pyx @@ -142,7 +142,7 @@ cdef class CacheDatabaseAdapter(CacheDatabaseFacade): if config.buffer_interval_ms and config.buffer_interval_ms > 1000: self._log.warning( f"High `buffer_interval_ms` at {config.buffer_interval_ms}, " - "recommended range is [10, 1000] milliseconds.", + "recommended range is [10, 1000] milliseconds", ) # Configuration @@ -162,6 +162,10 @@ cdef class CacheDatabaseAdapter(CacheDatabaseFacade): config_json=msgspec.json.encode(config), ) + def __del__(self) -> None: + self._log.info("Shutting down cache database") + self._backing.shutdown() + # -- COMMANDS ------------------------------------------------------------------------------------- cpdef void flush(self):