Skip to content

Commit

Permalink
Refine DatabentoLiveClient logging and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Mar 9, 2024
1 parent 1b3dd44 commit 194d10f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
7 changes: 7 additions & 0 deletions nautilus_core/adapters/src/databento/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ pub enum LiveMessage {
Error(anyhow::Error),
}

/// Handles a raw TCP data feed from the Databento LSG for a single dataset.
///
/// [`LiveCommand`] messages are recieved synchronously across a channel,
/// decoded records are sent asynchronously on a tokio channel as [`LiveMessage`]s
/// back to a message processing task.
pub struct DatabentoFeedHandler {
key: String,
dataset: String,
Expand All @@ -76,6 +81,7 @@ pub struct DatabentoFeedHandler {
}

impl DatabentoFeedHandler {
/// Initialize a new instance of the [`DatabentoFeedHandler`].
#[must_use]
pub fn new(
key: String,
Expand All @@ -96,6 +102,7 @@ impl DatabentoFeedHandler {
}
}

/// Run the feed handler to begin listening for commands and processing messages.
pub async fn run(&mut self) -> Result<()> {
debug!("Running feed handler");
let clock = get_atomic_clock_realtime();
Expand Down
24 changes: 12 additions & 12 deletions nautilus_core/adapters/src/databento/python/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

use std::{collections::HashMap, fs, str::FromStr};

use anyhow::anyhow;
use databento::live::Subscription;
use indexmap::IndexMap;
use nautilus_core::{
Expand Down Expand Up @@ -104,9 +103,8 @@ fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
impl DatabentoLiveClient {
#[new]
pub fn py_new(key: String, dataset: String, publishers_path: String) -> anyhow::Result<Self> {
let file_content = fs::read_to_string(publishers_path)?;
let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;

let publishers_json = fs::read_to_string(publishers_path)?;
let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&publishers_json)?;
let publisher_venue_map = publishers_vec
.into_iter()
.map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
Expand Down Expand Up @@ -170,20 +168,19 @@ impl DatabentoLiveClient {
callback_pyo3: PyObject,
) -> PyResult<&'py PyAny> {
if self.is_closed {
return Err(to_pyruntime_err("Client was already closed"));
return Err(to_pyruntime_err("Client is already closed"));
};
if self.is_running {
return Err(to_pyruntime_err("Client was already running"));
return Err(to_pyruntime_err("Client is already running"));
};
self.is_running = true;

let (tx_msg, rx_msg) = tokio::sync::mpsc::channel::<LiveMessage>(100_000);

// Consume the receiver
let rx_cmd = self
.rx_cmd
.take()
.ok_or_else(|| anyhow!("Client already started"))?;
// SAFETY: We guard the client from being started more than once with the
// `is_running` flag, so here it is safe to unwrap the command receiver.
let rx_cmd = self.rx_cmd.take().unwrap();

let mut feed_handler = DatabentoFeedHandler::new(
self.key.clone(),
Expand All @@ -199,8 +196,8 @@ impl DatabentoLiveClient {
pyo3_asyncio::tokio::future_into_py(py, async move {
let feed_handle = tokio::task::spawn(async move { feed_handler.run().await });
match Self::process_messages(rx_msg, callback, callback_pyo3).await {
Ok(()) => debug!("Recv handler completed"),
Err(e) => error!("Recv handler error: {e}"),
Ok(()) => debug!("Message processing completed"),
Err(e) => error!("Message processing error: {e}"),
}

match feed_handle.await {
Expand All @@ -220,6 +217,9 @@ impl DatabentoLiveClient {
if !self.is_running {
return Err(to_pyruntime_err("Client was never started"));
};
if self.is_closed {
return Err(to_pyruntime_err("Client is already closed"));
};

self.send_command(LiveCommand::Close)?;

Expand Down

0 comments on commit 194d10f

Please sign in to comment.