From d997fd0954b77b6386cff7d3fccc3ea1ab1fc26e Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Tue, 6 Feb 2024 19:59:20 +1100 Subject: [PATCH] Refine DatabentoLiveClient --- .../live/databento/databento_subscriber.py | 25 +++++----- .../adapters/src/databento/python/live.rs | 50 +++++++++++-------- 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/examples/live/databento/databento_subscriber.py b/examples/live/databento/databento_subscriber.py index f8d776914811..355ff2e39245 100644 --- a/examples/live/databento/databento_subscriber.py +++ b/examples/live/databento/databento_subscriber.py @@ -31,6 +31,7 @@ from nautilus_trader.model.data import OrderBookDeltas from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick +from nautilus_trader.model.enums import BookType from nautilus_trader.model.identifiers import InstrumentId from nautilus_trader.model.identifiers import TraderId from nautilus_trader.trading.strategy import Strategy @@ -42,9 +43,9 @@ # For correct subscription operation, you must specify all instruments to be immediately # subscribed for as part of the data client configuration instrument_ids = [ - InstrumentId.from_str("AAPL.XCHI"), InstrumentId.from_str("ESH4.GLBX"), - InstrumentId.from_str("ESM4.GLBX"), + # InstrumentId.from_str("ESM4.GLBX"), + # InstrumentId.from_str("AAPL.XCHI"), ] # Configure the trading node @@ -139,15 +140,15 @@ def on_start(self) -> None: # book_type=BookType.L3_MBO, # client_id=DATABENTO_CLIENT_ID, # ) - # self.subscribe_order_book_snapshots( - # instrument_id=instrument_id, - # book_type=BookType.L2_MBP, - # depth=10, - # client_id=DATABENTO_CLIENT_ID, - # interval_ms=100, - # ) - self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) - self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) + self.subscribe_order_book_snapshots( + instrument_id=instrument_id, + book_type=BookType.L2_MBP, + depth=10, + client_id=DATABENTO_CLIENT_ID, + interval_ms=100, + ) + # self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) + # self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) # self.request_quote_ticks(instrument_id) # self.request_trade_ticks(instrument_id) # self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL")) @@ -179,7 +180,7 @@ def on_order_book(self, order_book: OrderBook) -> None: """ Actions to be performed when an order book update is received. """ - # self.log.info("\n" + order_book.pprint(10), LogColor.CYAN) + self.log.info("\n" + order_book.pprint(10), LogColor.CYAN) def on_quote_tick(self, tick: QuoteTick) -> None: """ diff --git a/nautilus_core/adapters/src/databento/python/live.rs b/nautilus_core/adapters/src/databento/python/live.rs index 933e83a47a1d..a70ec9ad79df 100644 --- a/nautilus_core/adapters/src/databento/python/live.rs +++ b/nautilus_core/adapters/src/databento/python/live.rs @@ -15,7 +15,7 @@ use std::fs; use std::str::FromStr; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use anyhow::Result; use databento::live::Subscription; @@ -32,6 +32,7 @@ use nautilus_model::identifiers::instrument_id::InstrumentId; use nautilus_model::identifiers::symbol::Symbol; use nautilus_model::identifiers::venue::Venue; use nautilus_model::python::data::data_to_pycapsule; +use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; use time::OffsetDateTime; use tokio::sync::Mutex; @@ -51,7 +52,7 @@ pub struct DatabentoLiveClient { pub key: String, #[pyo3(get)] pub dataset: String, - inner: OnceLock>>, + inner: Option>>, runtime: tokio::runtime::Runtime, publishers: Arc>, } @@ -66,14 +67,14 @@ impl DatabentoLiveClient { .await } - fn get_inner_client(&self) -> Result>, databento::Error> { - if let Some(client) = self.inner.get() { - Ok(client.clone()) - } else { - let client = self.runtime.block_on(self.initialize_client())?; - let arc_client = Arc::new(Mutex::new(client)); - let _ = self.inner.set(arc_client.clone()); - Ok(arc_client) + fn get_inner_client(&mut self) -> Result>, databento::Error> { + match &self.inner { + Some(client) => Ok(client.clone()), + None => { + let client = self.runtime.block_on(self.initialize_client())?; + self.inner = Some(Arc::new(Mutex::new(client))); + Ok(self.inner.clone().unwrap()) + } } } } @@ -93,7 +94,7 @@ impl DatabentoLiveClient { Ok(Self { key, dataset, - inner: OnceLock::new(), + inner: None, runtime: tokio::runtime::Runtime::new()?, publishers: Arc::new(publishers), }) @@ -101,7 +102,7 @@ impl DatabentoLiveClient { #[pyo3(name = "subscribe")] fn py_subscribe<'py>( - &self, + &mut self, py: Python<'py>, schema: String, symbols: String, @@ -142,7 +143,7 @@ impl DatabentoLiveClient { } #[pyo3(name = "start")] - fn py_start<'py>(&self, py: Python<'py>, callback: PyObject) -> PyResult<&'py PyAny> { + fn py_start<'py>(&mut self, py: Python<'py>, callback: PyObject) -> PyResult<&'py PyAny> { let arc_client = self.get_inner_client().map_err(to_pyruntime_err)?; let publishers = self.publishers.clone(); @@ -257,16 +258,21 @@ impl DatabentoLiveClient { }) } - // TODO: Close wants to take ownership which isn't possible? #[pyo3(name = "close")] - fn py_close<'py>(&self, py: Python<'py>) -> PyResult<&'py PyAny> { - // let arc_client = self.get_inner_client().map_err(to_pyvalue_err)?; - - pyo3_asyncio::tokio::future_into_py(py, async move { - // let client = arc_client.lock_owned().await; - // client.close().await.map_err(to_pyvalue_err)?; - Ok(()) - }) + fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<&'py PyAny> { + match self.inner.take() { + Some(arc_client) => { + pyo3_asyncio::tokio::future_into_py(py, async move { + let _client = arc_client.lock_owned().await; + // Still need to determine how to take ownership here + // client.close().await.map_err(to_pyruntime_err) + Ok(()) + }) + } + None => Err(PyRuntimeError::new_err( + "Error on close: client was never started", + )), + } } }