Skip to content

Commit

Permalink
Refine DatabentoLiveClient logging
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Mar 7, 2024
1 parent e8bbc11 commit 2d6228a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
2 changes: 1 addition & 1 deletion examples/live/databento/databento_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
# Configure the trading node
config_node = TradingNodeConfig(
trader_id=TraderId("TESTER-001"),
logging=LoggingConfig(log_level="INFO"),
logging=LoggingConfig(log_level="INFO", use_pyo3=True),
exec_engine=LiveExecEngineConfig(
reconciliation=False, # Not applicable
inflight_check_interval_ms=0, # Not applicable
Expand Down
14 changes: 10 additions & 4 deletions nautilus_core/adapters/src/databento/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use databento::{
live::Subscription,
};
use indexmap::IndexMap;
use log::{error, info};
use log::{debug, error, info, trace};
use nautilus_core::{
python::{to_pyruntime_err, to_pyvalue_err},
time::{get_atomic_clock_realtime, AtomicTime},
Expand Down Expand Up @@ -101,6 +101,7 @@ impl DatabentoFeedHandler {
}

pub async fn run(&mut self) -> Result<()> {
debug!("Running feed handler...");
let clock = get_atomic_clock_realtime();
let mut symbol_map = PitSymbolMap::new();
let mut instrument_id_map: HashMap<u32, InstrumentId> = HashMap::new();
Expand Down Expand Up @@ -132,6 +133,7 @@ impl DatabentoFeedHandler {
self.replay = true;
}
client.subscribe(&sub).await.map_err(to_pyruntime_err)?;
debug!("DatabentoClient subscribing to {:?}", sub);
}
LiveCommand::UpdateGlbx(map) => self.glbx_exchange_map = map,
LiveCommand::Start => {
Expand All @@ -141,10 +143,12 @@ impl DatabentoFeedHandler {
};
client.start().await.map_err(to_pyruntime_err)?;
running = true;
debug!("DatabentoClient started");
}
LiveCommand::Close => {
if running {
client.close().await.map_err(to_pyruntime_err)?;
debug!("DatabentoClient closed");
}
return Ok(());
}
Expand Down Expand Up @@ -274,17 +278,19 @@ impl DatabentoFeedHandler {
}

fn send_msg(&mut self, msg: LiveMessage) {
self.tx.send(msg).expect("Error sending message")
trace!("Sending message {:?}", msg);
match self.tx.send(msg) {
Ok(_) => {}
Err(e) => error!("Error sending message: {:?}", e),
}
}
}

fn handle_error_msg(msg: &dbn::ErrorMsg) {
eprintln!("{msg:?}"); // TODO: Just print stderr for now
error!("{:?}", msg);
}

fn handle_system_msg(msg: &dbn::SystemMsg) {
println!("{msg:?}"); // TODO: Just print stdout for now
info!("{:?}", msg);
}

Expand Down
4 changes: 4 additions & 0 deletions nautilus_core/adapters/src/databento/python/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
use anyhow::anyhow;
use databento::live::Subscription;
use indexmap::IndexMap;
use log::debug;
use nautilus_common::runtime::get_runtime;
use nautilus_core::{
python::{to_pyruntime_err, to_pyvalue_err},
Expand Down Expand Up @@ -68,6 +69,7 @@ impl DatabentoLiveClient {
callback: PyObject,
callback_pyo3: PyObject,
) -> PyResult<()> {
debug!("Processing messages...");
// Continue to process messages until channel is hung up
while let Ok(msg) = rx.recv() {
match msg {
Expand Down Expand Up @@ -189,11 +191,13 @@ impl DatabentoLiveClient {
HashMap::new(),
);

debug!("Starting feed handler");
let rt = get_runtime();
rt.spawn(async move { feed_handler.run().await });

self.send_command(LiveCommand::Start)?;

debug!("Spawning message processing thread");
let join_handle =
thread::spawn(move || Self::process_messages(rx_msg, callback, callback_pyo3));
self.join_handle = Some(join_handle);
Expand Down

0 comments on commit 2d6228a

Please sign in to comment.