From 3d54cecb5bf48ddea47d9135ade7735e6e013df0 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Thu, 22 Feb 2024 20:50:25 +1100 Subject: [PATCH] Add OrderBookDeltas to_pyo3 conversion --- nautilus_core/model/src/python/data/deltas.rs | 15 ++++- nautilus_trader/common/actor.pxd | 20 +++--- nautilus_trader/common/actor.pyx | 24 +++++-- .../strategies/orderbook_imbalance_rust.py | 10 +-- nautilus_trader/model/data.pxd | 4 +- nautilus_trader/model/data.pyx | 63 +++++++++++-------- .../tracemalloc_orderbook_deltas.py | 16 ++++- tests/unit_tests/model/test_orderbook_data.py | 12 ++++ 8 files changed, 111 insertions(+), 53 deletions(-) diff --git a/nautilus_core/model/src/python/data/deltas.rs b/nautilus_core/model/src/python/data/deltas.rs index cd796c87d96f..21e58a95f117 100644 --- a/nautilus_core/model/src/python/data/deltas.rs +++ b/nautilus_core/model/src/python/data/deltas.rs @@ -16,13 +16,15 @@ use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, + ops::Deref, }; use nautilus_core::time::UnixNanos; -use pyo3::{prelude::*, pyclass::CompareOp}; +use pyo3::{prelude::*, pyclass::CompareOp, types::PyCapsule}; use crate::{ data::{delta::OrderBookDelta, deltas::OrderBookDeltas}, + ffi::data::deltas::OrderBookDeltas_API, identifiers::instrument_id::InstrumentId, python::common::PY_MODULE_MODEL, }; @@ -99,6 +101,17 @@ impl OrderBookDeltas { format!("{}:{}", PY_MODULE_MODEL, stringify!(OrderBookDeltas)) } + #[staticmethod] + #[pyo3(name = "from_pycapsule")] + pub fn py_from_pycapsule(capsule: &PyAny) -> OrderBookDeltas { + let capsule: &PyCapsule = capsule + .downcast() + .expect("Error on downcast to `&PyCapsule`"); + let data: &OrderBookDeltas_API = + unsafe { &*(capsule.pointer() as *const OrderBookDeltas_API) }; + data.deref().clone() + } + // /// Creates a `PyCapsule` containing a raw pointer to a `Data::Delta` object. // /// // /// This function takes the current object (assumed to be of a type that can be represented as diff --git a/nautilus_trader/common/actor.pxd b/nautilus_trader/common/actor.pxd index 78246fb6c3dd..270b4e399f7c 100644 --- a/nautilus_trader/common/actor.pxd +++ b/nautilus_trader/common/actor.pxd @@ -51,13 +51,14 @@ from nautilus_trader.portfolio.base cimport PortfolioFacade cdef class Actor(Component): cdef object _executor - cdef set _warning_events - cdef dict _signal_classes - cdef dict _pending_requests - cdef list _indicators - cdef dict _indicators_for_quotes - cdef dict _indicators_for_trades - cdef dict _indicators_for_bars + cdef set[type] _warning_events + cdef dict[str, type] _signal_classes + cdef dict[UUID4, object] _pending_requests + cdef list[Indicator] _indicators + cdef dict[InstrumentId, list[Indicator]] _indicators_for_quotes + cdef dict[InstrumentId, list[Indicator]] _indicators_for_trades + cdef dict[BarType, list[Indicator]] _indicators_for_bars + cdef set[type] _pyo3_conversion_types cdef readonly PortfolioFacade portfolio """The read-only portfolio for the actor.\n\n:returns: `PortfolioFacade`""" @@ -89,7 +90,7 @@ cdef class Actor(Component): cpdef void on_instrument_status(self, InstrumentStatus data) cpdef void on_instrument_close(self, InstrumentClose data) cpdef void on_instrument(self, Instrument instrument) - cpdef void on_order_book_deltas(self, OrderBookDeltas deltas) + cpdef void on_order_book_deltas(self, deltas) cpdef void on_order_book(self, OrderBook order_book) cpdef void on_quote_tick(self, QuoteTick tick) cpdef void on_trade_tick(self, TradeTick tick) @@ -144,6 +145,7 @@ cdef class Actor(Component): dict kwargs=*, ClientId client_id=*, bint managed=*, + bint pyo3_conversion=*, ) cpdef void subscribe_order_book_snapshots( self, @@ -231,7 +233,7 @@ cdef class Actor(Component): cpdef void handle_instrument(self, Instrument instrument) cpdef void handle_instruments(self, list instruments) cpdef void handle_order_book(self, OrderBook order_book) - cpdef void handle_order_book_deltas(self, OrderBookDeltas deltas) + cpdef void handle_order_book_deltas(self, deltas) cpdef void handle_quote_tick(self, QuoteTick tick) cpdef void handle_quote_ticks(self, list ticks) cpdef void handle_trade_tick(self, TradeTick tick) diff --git a/nautilus_trader/common/actor.pyx b/nautilus_trader/common/actor.pyx index 7f43f90e9ba0..40722df2efb1 100644 --- a/nautilus_trader/common/actor.pyx +++ b/nautilus_trader/common/actor.pyx @@ -125,6 +125,8 @@ cdef class Actor(Component): self._indicators_for_trades: dict[InstrumentId, list[Indicator]] = {} self._indicators_for_bars: dict[BarType, list[Indicator]] = {} + self._pyo3_conversion_types = set() + # Configuration self.config = config @@ -382,13 +384,13 @@ cdef class Actor(Component): """ # Optionally override in subclass - cpdef void on_order_book_deltas(self, OrderBookDeltas deltas): + cpdef void on_order_book_deltas(self, deltas): """ Actions to be performed when running and receives order book deltas. Parameters ---------- - deltas : OrderBookDeltas + deltas : OrderBookDeltas or nautilus_pyo3.OrderBookDeltas The order book deltas received. Warnings @@ -1185,6 +1187,7 @@ cdef class Actor(Component): dict kwargs = None, ClientId client_id = None, bint managed = True, + bint pyo3_conversion = False, ): """ Subscribe to the order book data stream, being a snapshot then deltas @@ -1205,11 +1208,17 @@ cdef class Actor(Component): If ``None`` then will be inferred from the venue in the instrument ID. managed : bool, default True If an order book should be managed by the data engine based on the subscribed feed. + pyo3_conversion : bool, default False + If received deltas should be converted to `nautilus_pyo3.OrderBookDeltas` + prior to being passed to the `on_order_book_deltas` handler. """ Condition.not_none(instrument_id, "instrument_id") Condition.true(self.trader_id is not None, "The actor has not been registered") + if pyo3_conversion: + self._pyo3_conversion_types.add(OrderBookDeltas) + self._msgbus.subscribe( topic=f"data.book.deltas" f".{instrument_id.venue}" @@ -2396,15 +2405,17 @@ cdef class Actor(Component): for i in range(length): self.handle_instrument(instruments[i]) - cpdef void handle_order_book_deltas(self, OrderBookDeltas deltas): + cpdef void handle_order_book_deltas(self, deltas): """ Handle the given order book deltas. - Passes to `on_order_book_delta` if state is ``RUNNING``. + Passes to `on_order_book_deltas` if state is ``RUNNING``. + The `deltas` will be `nautilus_pyo3.OrderBookDeltas` if the + pyo3_conversion flag was set for the subscription. Parameters ---------- - deltas : OrderBookDeltas + deltas : OrderBookDeltas or nautilus_pyo3.OrderBookDeltas The order book deltas received. Warnings @@ -2414,6 +2425,9 @@ cdef class Actor(Component): """ Condition.not_none(deltas, "deltas") + if OrderBookDeltas in self._pyo3_conversion_types: + deltas = deltas.to_pyo3() + if self._fsm.state == ComponentState.RUNNING: try: self.on_order_book_deltas(deltas) diff --git a/nautilus_trader/examples/strategies/orderbook_imbalance_rust.py b/nautilus_trader/examples/strategies/orderbook_imbalance_rust.py index 1cdfc5612689..f995cc7b0ceb 100644 --- a/nautilus_trader/examples/strategies/orderbook_imbalance_rust.py +++ b/nautilus_trader/examples/strategies/orderbook_imbalance_rust.py @@ -24,8 +24,6 @@ from nautilus_trader.core.nautilus_pyo3 import OrderBookMbp from nautilus_trader.core.rust.common import LogColor from nautilus_trader.model.book import OrderBook -from nautilus_trader.model.data import OrderBookDelta -from nautilus_trader.model.data import OrderBookDeltas from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.enums import BookType from nautilus_trader.model.enums import OrderSide @@ -138,18 +136,16 @@ def on_start(self) -> None: self.instrument.id, self.book_type, managed=False, # <-- Manually applying deltas to book + pyo3_conversion=True, # <--- Will automatically convert to pyo3 objects ) self._last_trigger_timestamp = self.clock.utc_now() - def on_order_book_deltas(self, deltas: OrderBookDeltas) -> None: + def on_order_book_deltas(self, pyo3_deltas: nautilus_pyo3.OrderBookDeltas) -> None: """ Actions to be performed when order book deltas are received. """ - # Convert to pyo3 objects (the efficiency of this can improve) - pyo3_deltas = OrderBookDelta.to_pyo3_list(deltas.deltas) - for pyo3_delta in pyo3_deltas: - self.book.apply_delta(pyo3_delta) + self.book.apply_deltas(pyo3_deltas) self.imbalance.handle_book_mbp(self.book) self.check_trigger() diff --git a/nautilus_trader/model/data.pxd b/nautilus_trader/model/data.pxd index 21eaf3945cdc..92f0c13dc169 100644 --- a/nautilus_trader/model/data.pxd +++ b/nautilus_trader/model/data.pxd @@ -52,7 +52,7 @@ cpdef list capsule_to_list(capsule) cpdef Data capsule_to_data(capsule) cdef inline void capsule_destructor(object capsule): - cdef CVec* cvec = PyCapsule_GetPointer(capsule, NULL) + cdef CVec *cvec = PyCapsule_GetPointer(capsule, NULL) PyMem_Free(cvec[0].ptr) # de-allocate buffer PyMem_Free(cvec) # de-allocate cvec @@ -244,6 +244,8 @@ cdef class OrderBookDeltas(Data): @staticmethod cdef dict to_dict_c(OrderBookDeltas obj) + cpdef to_pyo3(self) + cdef class OrderBookDepth10(Data): cdef OrderBookDepth10_t _mem diff --git a/nautilus_trader/model/data.pyx b/nautilus_trader/model/data.pyx index 0ec97c7c9355..a24cb7dbdb52 100644 --- a/nautilus_trader/model/data.pyx +++ b/nautilus_trader/model/data.pyx @@ -1888,12 +1888,12 @@ cdef class OrderBookDelta(Data): ) @staticmethod - cdef inline list capsule_to_list_c(object capsule): + cdef list[OrderBookDelta] capsule_to_list_c(object capsule): # SAFETY: Do NOT deallocate the capsule here # It is supposed to be deallocated by the creator cdef CVec* data = PyCapsule_GetPointer(capsule, NULL) cdef OrderBookDelta_t* ptr = data.ptr - cdef list deltas = [] + cdef list[OrderBookDelta] deltas = [] cdef uint64_t i for i in range(0, data.len): @@ -1902,18 +1902,18 @@ cdef class OrderBookDelta(Data): return deltas @staticmethod - cdef inline list_to_capsule_c(list items): + cdef object list_to_capsule_c(list items): # Create a C struct buffer cdef uint64_t len_ = len(items) - cdef OrderBookDelta_t * data = PyMem_Malloc(len_ * sizeof(OrderBookDelta_t)) + cdef OrderBookDelta_t *data = PyMem_Malloc(len_ * sizeof(OrderBookDelta_t)) cdef uint64_t i for i in range(len_): - data[i] = ( items[i])._mem + data[i] = (items[i])._mem if not data: raise MemoryError() # Create CVec - cdef CVec * cvec = PyMem_Malloc(1 * sizeof(CVec)) + cdef CVec *cvec = PyMem_Malloc(1 * sizeof(CVec)) cvec.ptr = data cvec.len = len_ cvec.cap = len_ @@ -2153,7 +2153,7 @@ cdef class OrderBookDeltas(Data): cdef uint64_t len_ = len(deltas) # Create a C OrderBookDeltas_t buffer - cdef OrderBookDelta_t* data = PyMem_Malloc(len_ * sizeof(OrderBookDelta_t)) + cdef OrderBookDelta_t *data = PyMem_Malloc(len_ * sizeof(OrderBookDelta_t)) if not data: raise MemoryError() @@ -2164,7 +2164,7 @@ cdef class OrderBookDeltas(Data): data[i] = delta._mem # Create CVec - cdef CVec* cvec = PyMem_Malloc(1 * sizeof(CVec)) + cdef CVec *cvec = PyMem_Malloc(1 * sizeof(CVec)) if not cvec: raise MemoryError() @@ -2195,7 +2195,7 @@ cdef class OrderBookDeltas(Data): cdef uint64_t len_ = len(deltas) # Create a C OrderBookDeltas_t buffer - cdef OrderBookDelta_t* data = PyMem_Malloc(len_ * sizeof(OrderBookDelta_t)) + cdef OrderBookDelta_t *data = PyMem_Malloc(len_ * sizeof(OrderBookDelta_t)) if not data: raise MemoryError() @@ -2206,7 +2206,7 @@ cdef class OrderBookDeltas(Data): data[i] = delta._mem # Create CVec - cdef CVec* cvec = PyMem_Malloc(1 * sizeof(CVec)) + cdef CVec *cvec = PyMem_Malloc(1 * sizeof(CVec)) if not cvec: raise MemoryError() @@ -2386,6 +2386,15 @@ cdef class OrderBookDeltas(Data): """ return OrderBookDeltas.to_dict_c(obj) + cpdef to_pyo3(self): + cdef OrderBookDeltas_API *data = PyMem_Malloc(sizeof(OrderBookDeltas_API)) + data[0] = self._mem + capsule = PyCapsule_New(data, NULL, NULL) + deltas = nautilus_pyo3.OrderBookDeltas.from_pycapsule(capsule) + PyMem_Free(data) + return deltas + + cdef class OrderBookDepth10(Data): """ @@ -2747,12 +2756,12 @@ cdef class OrderBookDepth10(Data): } @staticmethod - cdef inline list capsule_to_list_c(object capsule): + cdef list[OrderBookDepth10] capsule_to_list_c(object capsule): # SAFETY: Do NOT deallocate the capsule here # It is supposed to be deallocated by the creator cdef CVec* data = PyCapsule_GetPointer(capsule, NULL) cdef OrderBookDepth10_t* ptr = data.ptr - cdef list depths = [] + cdef list[OrderBookDepth10] depths = [] cdef uint64_t i for i in range(0, data.len): @@ -2761,10 +2770,10 @@ cdef class OrderBookDepth10(Data): return depths @staticmethod - cdef inline list_to_capsule_c(list items): + cdef object list_to_capsule_c(list items): # Create a C struct buffer cdef uint64_t len_ = len(items) - cdef OrderBookDepth10_t * data = PyMem_Malloc(len_ * sizeof(OrderBookDepth10_t)) + cdef OrderBookDepth10_t * data = PyMem_Malloc(len_ * sizeof(OrderBookDepth10_t)) cdef uint64_t i for i in range(len_): data[i] = (items[i])._mem @@ -2772,7 +2781,7 @@ cdef class OrderBookDepth10(Data): raise MemoryError() # Create CVec - cdef CVec * cvec = PyMem_Malloc(1 * sizeof(CVec)) + cdef CVec * cvec = PyMem_Malloc(1 * sizeof(CVec)) cvec.ptr = data cvec.len = len_ cvec.cap = len_ @@ -3425,12 +3434,12 @@ cdef class QuoteTick(Data): return quote @staticmethod - cdef inline list capsule_to_list_c(object capsule): + cdef list[QuoteTick] capsule_to_list_c(object capsule): # SAFETY: Do NOT deallocate the capsule here # It is supposed to be deallocated by the creator cdef CVec* data = PyCapsule_GetPointer(capsule, NULL) cdef QuoteTick_t* ptr = data.ptr - cdef list quotes = [] + cdef list[QuoteTick] quotes = [] cdef uint64_t i for i in range(0, data.len): @@ -3439,18 +3448,18 @@ cdef class QuoteTick(Data): return quotes @staticmethod - cdef inline list_to_capsule_c(list items): + cdef object list_to_capsule_c(list items): # Create a C struct buffer cdef uint64_t len_ = len(items) - cdef QuoteTick_t * data = PyMem_Malloc(len_ * sizeof(QuoteTick_t)) + cdef QuoteTick_t * data = PyMem_Malloc(len_ * sizeof(QuoteTick_t)) cdef uint64_t i for i in range(len_): - data[i] = ( items[i])._mem + data[i] = (items[i])._mem if not data: raise MemoryError() # Create CVec - cdef CVec * cvec = PyMem_Malloc(1 * sizeof(CVec)) + cdef CVec *cvec = PyMem_Malloc(1 * sizeof(CVec)) cvec.ptr = data cvec.len = len_ cvec.cap = len_ @@ -3916,12 +3925,12 @@ cdef class TradeTick(Data): return trade @staticmethod - cdef inline list capsule_to_list_c(capsule): + cdef list[TradeTick] capsule_to_list_c(capsule): # SAFETY: Do NOT deallocate the capsule here # It is supposed to be deallocated by the creator cdef CVec* data = PyCapsule_GetPointer(capsule, NULL) cdef TradeTick_t* ptr = data.ptr - cdef list trades = [] + cdef list[TradeTick] trades = [] cdef uint64_t i for i in range(0, data.len): @@ -3930,18 +3939,18 @@ cdef class TradeTick(Data): return trades @staticmethod - cdef inline list_to_capsule_c(list items): + cdef object list_to_capsule_c(list items): # Create a C struct buffer cdef uint64_t len_ = len(items) - cdef TradeTick_t * data = PyMem_Malloc(len_ * sizeof(TradeTick_t)) + cdef TradeTick_t *data = PyMem_Malloc(len_ * sizeof(TradeTick_t)) cdef uint64_t i for i in range(len_): - data[i] = ( items[i])._mem + data[i] = (items[i])._mem if not data: raise MemoryError() # Create CVec - cdef CVec* cvec = PyMem_Malloc(1 * sizeof(CVec)) + cdef CVec *cvec = PyMem_Malloc(1 * sizeof(CVec)) cvec.ptr = data cvec.len = len_ cvec.cap = len_ diff --git a/tests/mem_leak_tests/tracemalloc_orderbook_deltas.py b/tests/mem_leak_tests/tracemalloc_orderbook_deltas.py index b12f2cb3d78e..69538992c693 100644 --- a/tests/mem_leak_tests/tracemalloc_orderbook_deltas.py +++ b/tests/mem_leak_tests/tracemalloc_orderbook_deltas.py @@ -20,13 +20,22 @@ @snapshot_memory(4000) -def run_repr(*args, **kwargs): +def run_to_pyo3(*args, **kwargs): delta = TestDataStubs.order_book_delta() deltas = OrderBookDeltas(delta.instrument_id, deltas=[delta] * 1024) - repr(deltas.deltas) + pyo3_deltas = deltas.to_pyo3() + repr(pyo3_deltas) repr(deltas) +# @snapshot_memory(4000) +# def run_repr(*args, **kwargs): +# delta = TestDataStubs.order_book_delta() +# deltas = OrderBookDeltas(delta.instrument_id, deltas=[delta] * 1024) +# repr(deltas.deltas) +# repr(deltas) + + # @snapshot_memory(4000) # def run_from_pyo3(*args, **kwargs): # pyo3_delta = TestDataProviderPyo3.order_book_delta() @@ -34,5 +43,6 @@ def run_repr(*args, **kwargs): if __name__ == "__main__": - run_repr() + run_to_pyo3() + # run_repr() # run_from_pyo3() diff --git a/tests/unit_tests/model/test_orderbook_data.py b/tests/unit_tests/model/test_orderbook_data.py index 65054520ec95..0ee6814fc351 100644 --- a/tests/unit_tests/model/test_orderbook_data.py +++ b/tests/unit_tests/model/test_orderbook_data.py @@ -395,6 +395,18 @@ def test_deltas_pickle_round_trip() -> None: assert len(deltas.deltas) == len(unpickled.deltas) +def test_deltas_to_pyo3() -> None: + # Arrange + deltas = TestDataStubs.order_book_deltas() + + # Act + pyo3_deltas = deltas.to_pyo3() + + # Assert + assert isinstance(pyo3_deltas, nautilus_pyo3.OrderBookDeltas) + assert len(pyo3_deltas.deltas) == len(deltas.deltas) + + def test_deltas_hash_str_and_repr() -> None: # Arrange order1 = BookOrder(