Skip to content

Commit

Permalink
Refine DatabentoLiveClient
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Feb 6, 2024
1 parent 0f38e06 commit d997fd0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 34 deletions.
25 changes: 13 additions & 12 deletions examples/live/databento/databento_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import BookType
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.identifiers import TraderId
from nautilus_trader.trading.strategy import Strategy
Expand All @@ -42,9 +43,9 @@
# For correct subscription operation, you must specify all instruments to be immediately
# subscribed for as part of the data client configuration
instrument_ids = [
InstrumentId.from_str("AAPL.XCHI"),
InstrumentId.from_str("ESH4.GLBX"),
InstrumentId.from_str("ESM4.GLBX"),
# InstrumentId.from_str("ESM4.GLBX"),
# InstrumentId.from_str("AAPL.XCHI"),
]

# Configure the trading node
Expand Down Expand Up @@ -139,15 +140,15 @@ def on_start(self) -> None:
# book_type=BookType.L3_MBO,
# client_id=DATABENTO_CLIENT_ID,
# )
# self.subscribe_order_book_snapshots(
# instrument_id=instrument_id,
# book_type=BookType.L2_MBP,
# depth=10,
# client_id=DATABENTO_CLIENT_ID,
# interval_ms=100,
# )
self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
self.subscribe_order_book_snapshots(
instrument_id=instrument_id,
book_type=BookType.L2_MBP,
depth=10,
client_id=DATABENTO_CLIENT_ID,
interval_ms=100,
)
# self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
# self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
# self.request_quote_ticks(instrument_id)
# self.request_trade_ticks(instrument_id)
# self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL"))
Expand Down Expand Up @@ -179,7 +180,7 @@ def on_order_book(self, order_book: OrderBook) -> None:
"""
Actions to be performed when an order book update is received.
"""
# self.log.info("\n" + order_book.pprint(10), LogColor.CYAN)
self.log.info("\n" + order_book.pprint(10), LogColor.CYAN)

def on_quote_tick(self, tick: QuoteTick) -> None:
"""
Expand Down
50 changes: 28 additions & 22 deletions nautilus_core/adapters/src/databento/python/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use std::fs;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};
use std::sync::Arc;

use anyhow::Result;
use databento::live::Subscription;
Expand All @@ -32,6 +32,7 @@ use nautilus_model::identifiers::instrument_id::InstrumentId;
use nautilus_model::identifiers::symbol::Symbol;
use nautilus_model::identifiers::venue::Venue;
use nautilus_model::python::data::data_to_pycapsule;
use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use time::OffsetDateTime;
use tokio::sync::Mutex;
Expand All @@ -51,7 +52,7 @@ pub struct DatabentoLiveClient {
pub key: String,
#[pyo3(get)]
pub dataset: String,
inner: OnceLock<Arc<Mutex<databento::LiveClient>>>,
inner: Option<Arc<Mutex<databento::LiveClient>>>,
runtime: tokio::runtime::Runtime,
publishers: Arc<IndexMap<PublisherId, DatabentoPublisher>>,
}
Expand All @@ -66,14 +67,14 @@ impl DatabentoLiveClient {
.await
}

fn get_inner_client(&self) -> Result<Arc<Mutex<databento::LiveClient>>, databento::Error> {
if let Some(client) = self.inner.get() {
Ok(client.clone())
} else {
let client = self.runtime.block_on(self.initialize_client())?;
let arc_client = Arc::new(Mutex::new(client));
let _ = self.inner.set(arc_client.clone());
Ok(arc_client)
fn get_inner_client(&mut self) -> Result<Arc<Mutex<databento::LiveClient>>, databento::Error> {
match &self.inner {
Some(client) => Ok(client.clone()),
None => {
let client = self.runtime.block_on(self.initialize_client())?;
self.inner = Some(Arc::new(Mutex::new(client)));
Ok(self.inner.clone().unwrap())
}
}
}
}
Expand All @@ -93,15 +94,15 @@ impl DatabentoLiveClient {
Ok(Self {
key,
dataset,
inner: OnceLock::new(),
inner: None,
runtime: tokio::runtime::Runtime::new()?,
publishers: Arc::new(publishers),
})
}

#[pyo3(name = "subscribe")]
fn py_subscribe<'py>(
&self,
&mut self,
py: Python<'py>,
schema: String,
symbols: String,
Expand Down Expand Up @@ -142,7 +143,7 @@ impl DatabentoLiveClient {
}

#[pyo3(name = "start")]
fn py_start<'py>(&self, py: Python<'py>, callback: PyObject) -> PyResult<&'py PyAny> {
fn py_start<'py>(&mut self, py: Python<'py>, callback: PyObject) -> PyResult<&'py PyAny> {
let arc_client = self.get_inner_client().map_err(to_pyruntime_err)?;
let publishers = self.publishers.clone();

Expand Down Expand Up @@ -257,16 +258,21 @@ impl DatabentoLiveClient {
})
}

// TODO: Close wants to take ownership which isn't possible?
#[pyo3(name = "close")]
fn py_close<'py>(&self, py: Python<'py>) -> PyResult<&'py PyAny> {
// let arc_client = self.get_inner_client().map_err(to_pyvalue_err)?;

pyo3_asyncio::tokio::future_into_py(py, async move {
// let client = arc_client.lock_owned().await;
// client.close().await.map_err(to_pyvalue_err)?;
Ok(())
})
fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<&'py PyAny> {
match self.inner.take() {
Some(arc_client) => {
pyo3_asyncio::tokio::future_into_py(py, async move {
let _client = arc_client.lock_owned().await;
// Still need to determine how to take ownership here
// client.close().await.map_err(to_pyruntime_err)
Ok(())
})
}
None => Err(PyRuntimeError::new_err(
"Error on close: client was never started",
)),
}
}
}

Expand Down

0 comments on commit d997fd0

Please sign in to comment.