Skip to content

Commit

Permalink
Join handle shutting down cache database
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Mar 23, 2024
1 parent 2166e2a commit 2145c90
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 11 deletions.
1 change: 1 addition & 0 deletions nautilus_core/common/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub trait CacheDatabase {
instance_id: UUID4,
config: HashMap<String, serde_json::Value>,
) -> anyhow::Result<Self::DatabaseType>;
fn shutdown(&mut self) -> anyhow::Result<()>;
fn flushdb(&mut self) -> anyhow::Result<()>;
fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>>;
fn read(&mut self, key: &str) -> anyhow::Result<Vec<Vec<u8>>>;
Expand Down
5 changes: 5 additions & 0 deletions nautilus_core/infrastructure/src/python/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ impl RedisCacheDatabase {
}
}

#[pyo3(name = "shutdown")]
fn py_shutdown(&mut self) -> PyResult<()> {
self.shutdown().map_err(to_pyruntime_err)
}

#[pyo3(name = "flushdb")]
fn py_flushdb(&mut self) -> PyResult<()> {
match self.flushdb() {
Expand Down
31 changes: 21 additions & 10 deletions nautilus_core/infrastructure/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use std::{
collections::{HashMap, VecDeque},
sync::mpsc::{channel, Receiver, Sender, TryRecvError},
thread,
thread::{self, JoinHandle},
time::{Duration, Instant},
};

Expand All @@ -28,7 +28,7 @@ use nautilus_core::{correctness::check_slice_not_empty, uuid::UUID4};
use nautilus_model::identifiers::trader_id::TraderId;
use redis::{Commands, Connection, Pipeline};
use serde_json::{json, Value};
use tracing::debug;
use tracing::{debug, error};

// Error constants
const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
Expand Down Expand Up @@ -73,6 +73,7 @@ pub struct RedisCacheDatabase {
trader_key: String,
conn: Connection,
tx: Sender<DatabaseCommand>,
handle: Option<JoinHandle<()>>,
}

impl CacheDatabase for RedisCacheDatabase {
Expand All @@ -93,7 +94,7 @@ impl CacheDatabase for RedisCacheDatabase {
let trader_key = get_trader_key(trader_id, instance_id, &config);
let trader_key_clone = trader_key.clone();

let _join_handle = thread::Builder::new()
let handle = thread::Builder::new()
.name("cache".to_string())
.spawn(move || {
Self::handle_messages(rx, trader_key_clone, config);
Expand All @@ -105,9 +106,19 @@ impl CacheDatabase for RedisCacheDatabase {
trader_key,
conn,
tx,
handle: Some(handle),
})
}

fn shutdown(&mut self) -> anyhow::Result<()> {
debug!("Shutting down");
if let Some(handle) = self.handle.take() {
handle.join().map_err(|e| anyhow::anyhow!("{:?}", e))
} else {
Err(anyhow::anyhow!("Cache database already shutdown"))
}
}

fn flushdb(&mut self) -> anyhow::Result<()> {
match redis::cmd(FLUSHDB).query::<()>(&mut self.conn) {
Ok(_) => Ok(()),
Expand Down Expand Up @@ -212,7 +223,7 @@ fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque<D
let collection = match get_collection_key(&msg.key) {
Ok(collection) => collection,
Err(e) => {
eprintln!("{e}");
error!("{e}");
continue; // Continue to next message
}
};
Expand All @@ -222,7 +233,7 @@ fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque<D
match msg.op_type {
DatabaseOperation::Insert => {
if msg.payload.is_none() {
eprintln!("Null `payload` for `insert`");
error!("Null `payload` for `insert`");
continue; // Continue to next message
};

Expand All @@ -235,12 +246,12 @@ fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque<D
.collect::<Vec<&[u8]>>();

if let Err(e) = insert(&mut pipe, collection, &key, payload) {
eprintln!("{e}");
error!("{e}");
}
}
DatabaseOperation::Update => {
if msg.payload.is_none() {
eprintln!("Null `payload` for `update`");
error!("Null `payload` for `update`");
continue; // Continue to next message
};

Expand All @@ -253,7 +264,7 @@ fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque<D
.collect::<Vec<&[u8]>>();

if let Err(e) = update(&mut pipe, collection, &key, payload) {
eprintln!("{e}");
error!("{e}");
}
}
DatabaseOperation::Delete => {
Expand All @@ -264,14 +275,14 @@ fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque<D
.map(|v| v.iter().map(|v| v.as_slice()).collect::<Vec<&[u8]>>());

if let Err(e) = delete(&mut pipe, collection, &key, payload) {
eprintln!("{e}");
error!("{e}");
}
}
}
}

if let Err(e) = pipe.query::<()>(conn) {
eprintln!("{e}");
error!("{e}");
}
}

Expand Down
6 changes: 5 additions & 1 deletion nautilus_trader/cache/database.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ cdef class CacheDatabaseAdapter(CacheDatabaseFacade):
if config.buffer_interval_ms and config.buffer_interval_ms > 1000:
self._log.warning(
f"High `buffer_interval_ms` at {config.buffer_interval_ms}, "
"recommended range is [10, 1000] milliseconds.",
"recommended range is [10, 1000] milliseconds",
)

# Configuration
Expand All @@ -162,6 +162,10 @@ cdef class CacheDatabaseAdapter(CacheDatabaseFacade):
config_json=msgspec.json.encode(config),
)

def __del__(self) -> None:
self._log.info("Shutting down cache database")
self._backing.shutdown()

# -- COMMANDS -------------------------------------------------------------------------------------

cpdef void flush(self):
Expand Down

0 comments on commit 2145c90

Please sign in to comment.