diff --git a/nautilus_core/model/src/python/data/bar.rs b/nautilus_core/model/src/python/data/bar.rs index 4f04c89df57a..7570f4d69c6e 100644 --- a/nautilus_core/model/src/python/data/bar.rs +++ b/nautilus_core/model/src/python/data/bar.rs @@ -16,6 +16,7 @@ use std::{ collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, + str::FromStr, }; use nautilus_core::{ @@ -116,6 +117,12 @@ impl BarType { fn py_fully_qualified_name() -> String { format!("{}:{}", PY_MODULE_MODEL, stringify!(BarType)) } + + #[staticmethod] + #[pyo3(name = "from_str")] + fn py_from_str(value: &str) -> PyResult { + BarType::from_str(value).map_err(to_pyvalue_err) + } } #[pymethods] diff --git a/nautilus_core/persistence/src/python/backend/transformer.rs b/nautilus_core/persistence/src/python/backend/transformer.rs index 422dbea37a97..cfcc0b8b7c2c 100644 --- a/nautilus_core/persistence/src/python/backend/transformer.rs +++ b/nautilus_core/persistence/src/python/backend/transformer.rs @@ -20,8 +20,8 @@ use datafusion::arrow::{ }; use nautilus_core::python::to_pyvalue_err; use nautilus_model::data::{ - bar::Bar, delta::OrderBookDelta, is_monotonically_increasing_by_init, quote::QuoteTick, - trade::TradeTick, + bar::Bar, delta::OrderBookDelta, depth::OrderBookDepth10, is_monotonically_increasing_by_init, + quote::QuoteTick, trade::TradeTick, }; use pyo3::{ exceptions::{PyRuntimeError, PyTypeError, PyValueError}, @@ -102,9 +102,9 @@ impl DataTransformer { } /// Transforms the given record `batches` into Python `bytes`. - fn record_batches_to_pybytes( + fn record_batch_to_pybytes( py: Python<'_>, - batches: Vec, + batch: RecordBatch, schema: Schema, ) -> PyResult> { // Create a cursor to write to a byte array in memory @@ -112,11 +112,10 @@ impl DataTransformer { { let mut writer = StreamWriter::try_new(&mut cursor, &schema) .map_err(|err| PyRuntimeError::new_err(format!("{err}")))?; - for batch in batches { - writer - .write(&batch) - .map_err(|err| PyRuntimeError::new_err(format!("{err}")))?; - } + + writer + .write(&batch) + .map_err(|err| PyRuntimeError::new_err(format!("{err}")))?; writer .finish() @@ -137,6 +136,7 @@ impl DataTransformer { let cls_str: &str = cls.getattr("__name__")?.extract()?; let result_map = match cls_str { stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(), + stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(), stringify!(QuoteTick) => QuoteTick::get_schema_map(), stringify!(TradeTick) => TradeTick::get_schema_map(), stringify!(Bar) => Bar::get_schema_map(), @@ -153,7 +153,7 @@ impl DataTransformer { /// Return Python `bytes` from the given list of 'legacy' data objects, which can be passed /// to `pa.ipc.open_stream` to create a `RecordBatchReader`. #[staticmethod] - pub fn pyobjects_to_batches_bytes( + pub fn pyobjects_to_record_batch_bytes( py: Python<'_>, data: Vec, ) -> PyResult> { @@ -172,19 +172,19 @@ impl DataTransformer { match data_type.as_str() { stringify!(OrderBookDelta) => { let deltas = Self::pyobjects_to_order_book_deltas(py, data)?; - Self::pyo3_order_book_deltas_to_batches_bytes(py, deltas) + Self::pyo3_order_book_deltas_to_record_batch_bytes(py, deltas) } stringify!(QuoteTick) => { let quotes = Self::pyobjects_to_quote_ticks(py, data)?; - Self::pyo3_quote_ticks_to_batch_bytes(py, quotes) + Self::pyo3_quote_ticks_to_record_batch_bytes(py, quotes) } stringify!(TradeTick) => { let trades = Self::pyobjects_to_trade_ticks(py, data)?; - Self::pyo3_trade_ticks_to_batches_bytes(py, trades) + Self::pyo3_trade_ticks_to_record_batch_bytes(py, trades) } stringify!(Bar) => { let bars = Self::pyobjects_to_bars(py, data)?; - Self::pyo3_bars_to_batches_bytes(py, bars) + Self::pyo3_bars_to_record_batch_bytes(py, bars) } _ => Err(PyValueError::new_err(format!( "unsupported data type: {data_type}" @@ -193,7 +193,7 @@ impl DataTransformer { } #[staticmethod] - pub fn pyo3_order_book_deltas_to_batches_bytes( + pub fn pyo3_order_book_deltas_to_record_batch_bytes( py: Python<'_>, data: Vec, ) -> PyResult> { @@ -216,14 +216,44 @@ impl DataTransformer { match result { Ok(batch) => { let schema = OrderBookDelta::get_schema(Some(metadata)); - Self::record_batches_to_pybytes(py, vec![batch], schema) + Self::record_batch_to_pybytes(py, batch, schema) + } + Err(e) => Err(to_pyvalue_err(e)), + } + } + + #[staticmethod] + pub fn pyo3_order_book_depth10_to_record_batch_bytes( + py: Python<'_>, + data: Vec, + ) -> PyResult> { + if data.is_empty() { + return Err(PyValueError::new_err(ERROR_EMPTY_DATA)); + } + + // Take first element and extract metadata + // SAFETY: Unwrap safe as already checked that `data` not empty + let first = data.first().unwrap(); + let metadata = OrderBookDepth10::get_metadata( + &first.instrument_id, + first.bids[0].price.precision, + first.bids[0].size.precision, + ); + + let result: Result = + OrderBookDepth10::encode_batch(&metadata, &data); + + match result { + Ok(batch) => { + let schema = OrderBookDepth10::get_schema(Some(metadata)); + Self::record_batch_to_pybytes(py, batch, schema) } Err(e) => Err(to_pyvalue_err(e)), } } #[staticmethod] - pub fn pyo3_quote_ticks_to_batch_bytes( + pub fn pyo3_quote_ticks_to_record_batch_bytes( py: Python<'_>, data: Vec, ) -> PyResult> { @@ -245,14 +275,14 @@ impl DataTransformer { match result { Ok(batch) => { let schema = QuoteTick::get_schema(Some(metadata)); - Self::record_batches_to_pybytes(py, vec![batch], schema) + Self::record_batch_to_pybytes(py, batch, schema) } Err(e) => Err(to_pyvalue_err(e)), } } #[staticmethod] - pub fn pyo3_trade_ticks_to_batches_bytes( + pub fn pyo3_trade_ticks_to_record_batch_bytes( py: Python<'_>, data: Vec, ) -> PyResult> { @@ -274,14 +304,17 @@ impl DataTransformer { match result { Ok(batch) => { let schema = TradeTick::get_schema(Some(metadata)); - Self::record_batches_to_pybytes(py, vec![batch], schema) + Self::record_batch_to_pybytes(py, batch, schema) } Err(e) => Err(to_pyvalue_err(e)), } } #[staticmethod] - pub fn pyo3_bars_to_batches_bytes(py: Python<'_>, data: Vec) -> PyResult> { + pub fn pyo3_bars_to_record_batch_bytes( + py: Python<'_>, + data: Vec, + ) -> PyResult> { if data.is_empty() { return Err(to_pyvalue_err(ERROR_EMPTY_DATA)); } @@ -300,7 +333,7 @@ impl DataTransformer { match result { Ok(batch) => { let schema = Bar::get_schema(Some(metadata)); - Self::record_batches_to_pybytes(py, vec![batch], schema) + Self::record_batch_to_pybytes(py, batch, schema) } Err(e) => Err(to_pyvalue_err(e)), } diff --git a/nautilus_core/persistence/src/python/wranglers/bar.rs b/nautilus_core/persistence/src/python/wranglers/bar.rs index 4ade08fb04b9..14876aec93ea 100644 --- a/nautilus_core/persistence/src/python/wranglers/bar.rs +++ b/nautilus_core/persistence/src/python/wranglers/bar.rs @@ -60,7 +60,7 @@ impl BarDataWrangler { self.size_precision } - fn process_record_batches_bytes(&self, _py: Python, data: &[u8]) -> PyResult> { + fn process_record_batch_bytes(&self, _py: Python, data: &[u8]) -> PyResult> { // Create a StreamReader (from Arrow IPC) let cursor = Cursor::new(data); let reader = match StreamReader::try_new(cursor, None) { diff --git a/nautilus_core/persistence/src/python/wranglers/delta.rs b/nautilus_core/persistence/src/python/wranglers/delta.rs index fde89c6fead3..e123f1b38e0b 100644 --- a/nautilus_core/persistence/src/python/wranglers/delta.rs +++ b/nautilus_core/persistence/src/python/wranglers/delta.rs @@ -61,7 +61,7 @@ impl OrderBookDeltaDataWrangler { self.size_precision } - fn process_record_batches_bytes( + fn process_record_batch_bytes( &self, _py: Python, data: &[u8], diff --git a/nautilus_core/persistence/src/python/wranglers/quote.rs b/nautilus_core/persistence/src/python/wranglers/quote.rs index 35555d41e2c7..6b3a9aaabd0c 100644 --- a/nautilus_core/persistence/src/python/wranglers/quote.rs +++ b/nautilus_core/persistence/src/python/wranglers/quote.rs @@ -60,7 +60,7 @@ impl QuoteTickDataWrangler { self.size_precision } - fn process_record_batches_bytes(&self, _py: Python, data: &[u8]) -> PyResult> { + fn process_record_batch_bytes(&self, _py: Python, data: &[u8]) -> PyResult> { // Create a StreamReader (from Arrow IPC) let cursor = Cursor::new(data); let reader = match StreamReader::try_new(cursor, None) { diff --git a/nautilus_core/persistence/src/python/wranglers/trade.rs b/nautilus_core/persistence/src/python/wranglers/trade.rs index 37e9fef23c15..dbf9a1f54542 100644 --- a/nautilus_core/persistence/src/python/wranglers/trade.rs +++ b/nautilus_core/persistence/src/python/wranglers/trade.rs @@ -60,7 +60,7 @@ impl TradeTickDataWrangler { self.size_precision } - fn process_record_batches_bytes(&self, _py: Python, data: &[u8]) -> PyResult> { + fn process_record_batch_bytes(&self, _py: Python, data: &[u8]) -> PyResult> { // Create a StreamReader (from Arrow IPC) let cursor = Cursor::new(data); let reader = match StreamReader::try_new(cursor, None) { diff --git a/nautilus_trader/core/nautilus_pyo3.pyi b/nautilus_trader/core/nautilus_pyo3.pyi index 005a1d6112f7..a6f3af1fafc2 100644 --- a/nautilus_trader/core/nautilus_pyo3.pyi +++ b/nautilus_trader/core/nautilus_pyo3.pyi @@ -8,8 +8,6 @@ from decimal import Decimal from enum import Enum from typing import Any -from pyarrow import RecordBatch - from nautilus_trader.core.data import Data @@ -1280,17 +1278,17 @@ class DataTransformer: @staticmethod def get_schema_map(data_cls: type) -> dict[str, str]: ... @staticmethod - def pyobjects_to_batches_bytes(data: list[Data]) -> bytes: ... + def pyobjects_to_record_batch_bytes(data: list[Data]) -> bytes: ... @staticmethod - def pyo3_order_book_deltas_to_batches_bytes(data: list[OrderBookDelta]) -> bytes: ... + def pyo3_order_book_deltas_to_record_batch_bytes(data: list[OrderBookDelta]) -> bytes: ... @staticmethod - def pyo3_quote_ticks_to_batch_bytes(data: list[QuoteTick]) -> bytes: ... + def pyo3_order_book_depth10_to_record_batch_bytes(data: list[OrderBookDepth10]) -> bytes: ... @staticmethod - def pyo3_trade_ticks_to_batches_bytes(data: list[TradeTick]) -> bytes: ... + def pyo3_quote_ticks_to_record_batch_bytes(data: list[QuoteTick]) -> bytes: ... @staticmethod - def pyo3_bars_to_batches_bytes(data: list[Bar]) -> bytes: ... + def pyo3_trade_ticks_to_record_batch_bytes(data: list[TradeTick]) -> bytes: ... @staticmethod - def record_batches_to_pybytes(batches: list[RecordBatch], schema: Any) -> bytes: ... + def pyo3_bars_to_record_batch_bytes(data: list[Bar]) -> bytes: ... class BarDataWrangler: def __init__( @@ -1305,7 +1303,7 @@ class BarDataWrangler: def price_precision(self) -> int: ... @property def size_precision(self) -> int: ... - def process_record_batches_bytes(self, data: bytes) -> list[Bar]: ... + def process_record_batch_bytes(self, data: bytes) -> list[Bar]: ... class OrderBookDeltaDataWrangler: def __init__( @@ -1320,7 +1318,7 @@ class OrderBookDeltaDataWrangler: def price_precision(self) -> int: ... @property def size_precision(self) -> int: ... - def process_record_batches_bytes(self, data: bytes) -> list[OrderBookDelta]: ... + def process_record_batch_bytes(self, data: bytes) -> list[OrderBookDelta]: ... class QuoteTickDataWrangler: def __init__( @@ -1335,7 +1333,7 @@ class QuoteTickDataWrangler: def price_precision(self) -> int: ... @property def size_precision(self) -> int: ... - def process_record_batches_bytes(self, data: bytes) -> list[QuoteTick]: ... + def process_record_batch_bytes(self, data: bytes) -> list[QuoteTick]: ... class TradeTickDataWrangler: def __init__( @@ -1350,7 +1348,7 @@ class TradeTickDataWrangler: def price_precision(self) -> int: ... @property def size_precision(self) -> int: ... - def process_record_batches_bytes(self, data: bytes) -> list[TradeTick]: ... + def process_record_batch_bytes(self, data: bytes) -> list[TradeTick]: ... ################################################################################################### diff --git a/nautilus_trader/model/data.pyx b/nautilus_trader/model/data.pyx index dacca1d3707d..2f60553a94d4 100644 --- a/nautilus_trader/model/data.pyx +++ b/nautilus_trader/model/data.pyx @@ -1016,25 +1016,27 @@ cdef class Bar(Data): """ cdef list output = [] - bar_type = None + pyo3_bar_type = None cdef uint8_t price_prec = 0 cdef uint8_t volume_prec = 0 cdef: Bar bar + BarType bar_type for bar in bars: - if bar_type is None: - bar_type = nautilus_pyo3.BarType.from_str(bar.bar_type.value) + if pyo3_bar_type is None: + bar_type = bar.bar_type + pyo3_bar_type = nautilus_pyo3.BarType.from_str(bar_type.to_str()) price_prec = bar._mem.open.precision volume_prec = bar._mem.volume.precision pyo3_bar = nautilus_pyo3.Bar( - bar_type, + pyo3_bar_type, nautilus_pyo3.Price.from_raw(bar._mem.open.raw, price_prec), nautilus_pyo3.Price.from_raw(bar._mem.high.raw, price_prec), - nautilus_pyo3.Price.from_raw_c(bar._mem.low.raw, price_prec), - nautilus_pyo3.Price.from_raw_c(bar._mem.close.raw, price_prec), - nautilus_pyo3.Quantity.from_raw_c(bar._mem.volume.raw, volume_prec), + nautilus_pyo3.Price.from_raw(bar._mem.low.raw, price_prec), + nautilus_pyo3.Price.from_raw(bar._mem.close.raw, price_prec), + nautilus_pyo3.Quantity.from_raw(bar._mem.volume.raw, volume_prec), bar._mem.ts_event, bar._mem.ts_init, ) @@ -1945,6 +1947,56 @@ cdef class OrderBookDelta(Data): """ return OrderBookDelta.clear_c(instrument_id, ts_event, ts_init, sequence) + @staticmethod + def to_pyo3_list(list[OrderBookDelta] deltas) -> list[nautilus_pyo3.OrderBookDelta]: + """ + Return pyo3 Rust order book deltas converted from the given legacy Cython objects. + + Parameters + ---------- + pyo3_deltas : list[OrderBookDelta] + The pyo3 Rust order book deltas to convert from. + + Returns + ------- + list[nautilus_pyo3.OrderBookDelta] + + """ + cdef list output = [] + + pyo3_instrument_id = None + cdef uint8_t price_prec = 0 + cdef uint8_t size_prec = 0 + + cdef: + OrderBookDelta delta + BookOrder book_order + for delta in deltas: + if pyo3_instrument_id is None: + pyo3_instrument_id = nautilus_pyo3.InstrumentId.from_str(delta.instrument_id.value) + price_prec = delta.order.price.precision + size_prec = delta.order.size.precision + + pyo3_book_order = nautilus_pyo3.BookOrder( + nautilus_pyo3.OrderSide(order_side_to_str(delta._mem.order.side)), + nautilus_pyo3.Price.from_raw(delta._mem.order.price.raw, price_prec), + nautilus_pyo3.Quantity.from_raw(delta._mem.order.size.raw, size_prec), + delta._mem.order.order_id, + ) + + pyo3_delta = nautilus_pyo3.OrderBookDelta( + pyo3_instrument_id, + nautilus_pyo3.BookAction(book_action_to_str(delta._mem.action)), + pyo3_book_order, + delta._mem.flags, + delta._mem.sequence, + delta._mem.ts_event, + delta._mem.ts_init, + ) + output.append(pyo3_delta) + + return output + @staticmethod def from_pyo3_list(list pyo3_deltas) -> list[OrderBookDelta]: """ @@ -3280,7 +3332,7 @@ cdef class QuoteTick(Data): """ cdef list output = [] - instrument_id = None + pyo3_instrument_id = None cdef uint8_t bid_prec = 0 cdef uint8_t ask_prec = 0 cdef uint8_t bid_size_prec = 0 @@ -3289,15 +3341,15 @@ cdef class QuoteTick(Data): cdef: QuoteTick quote for quote in quotes: - if instrument_id is None: - instrument_id = nautilus_pyo3.InstrumentId.from_str(quote.instrument_id.value) + if pyo3_instrument_id is None: + pyo3_instrument_id = nautilus_pyo3.InstrumentId.from_str(quote.instrument_id.value) bid_prec = quote.bid_price.precision ask_prec = quote.ask_price.precision bid_size_prec = quote.bid_size.precision ask_size_prec = quote.ask_size.precision pyo3_quote = nautilus_pyo3.QuoteTick( - instrument_id, + pyo3_instrument_id, nautilus_pyo3.Price.from_raw(quote._mem.bid_price.raw, bid_prec), nautilus_pyo3.Price.from_raw(quote._mem.ask_price.raw, ask_prec), nautilus_pyo3.Quantity.from_raw(quote._mem.bid_size.raw, bid_size_prec), @@ -3717,6 +3769,48 @@ cdef class TradeTick(Data): """ return TradeTick.to_dict_c(obj) + @staticmethod + def to_pyo3_list(list[TradeTick] trades) -> list[nautilus_pyo3.TradeTick]: + """ + Return pyo3 Rust trade ticks converted from the given legacy Cython objects. + + Parameters + ---------- + ticks : list[TradeTick] + The legacy Cython Rust trade ticks to convert from. + + Returns + ------- + list[nautilus_pyo3.TradeTick] + + """ + cdef list output = [] + + pyo3_instrument_id = None + cdef uint8_t price_prec = 0 + cdef uint8_t size_prec = 0 + + cdef: + TradeTick trade + for trade in trades: + if pyo3_instrument_id is None: + pyo3_instrument_id = nautilus_pyo3.InstrumentId.from_str(trade.instrument_id.value) + price_prec = trade.price.precision + size_prec = trade.price.precision + + pyo3_trade = nautilus_pyo3.TradeTick( + pyo3_instrument_id, + nautilus_pyo3.Price.from_raw(trade._mem.price.raw, price_prec), + nautilus_pyo3.Quantity.from_raw(trade._mem.size.raw, size_prec), + nautilus_pyo3.AggressorSide(aggressor_side_to_str(trade._mem.aggressor_side)), + nautilus_pyo3.TradeId(trade.trade_id.value), + trade._mem.ts_event, + trade._mem.ts_init, + ) + output.append(pyo3_trade) + + return output + @staticmethod def from_pyo3_list(list pyo3_ticks) -> list[TradeTick]: """ diff --git a/nautilus_trader/persistence/wranglers_v2.py b/nautilus_trader/persistence/wranglers_v2.py index dccca884631a..a4f15c0654a1 100644 --- a/nautilus_trader/persistence/wranglers_v2.py +++ b/nautilus_trader/persistence/wranglers_v2.py @@ -92,7 +92,7 @@ def from_arrow( writer.close() data: bytes = sink.getvalue().to_pybytes() - return self._inner.process_record_batches_bytes(data) + return self._inner.process_record_batch_bytes(data) def from_pandas( self, @@ -192,7 +192,7 @@ def from_arrow( writer.close() data: bytes = sink.getvalue().to_pybytes() - return self._inner.process_record_batches_bytes(data) + return self._inner.process_record_batch_bytes(data) def from_pandas( self, @@ -314,7 +314,7 @@ def from_arrow( writer.close() data: bytes = sink.getvalue().to_pybytes() - return self._inner.process_record_batches_bytes(data) + return self._inner.process_record_batch_bytes(data) def from_json( self, @@ -438,7 +438,7 @@ def from_arrow( writer.close() data = sink.getvalue().to_pybytes() - return self._inner.process_record_batches_bytes(data) + return self._inner.process_record_batch_bytes(data) def from_pandas( self, diff --git a/nautilus_trader/serialization/arrow/serializer.py b/nautilus_trader/serialization/arrow/serializer.py index d96a1555137c..0fdb83b5e229 100644 --- a/nautilus_trader/serialization/arrow/serializer.py +++ b/nautilus_trader/serialization/arrow/serializer.py @@ -115,14 +115,22 @@ def rust_objects_to_record_batch(data: list[Data], data_cls: type) -> pa.Table | data = sorted(data, key=lambda x: x.ts_init) processed = ArrowSerializer._unpack_container_objects(data_cls, data) - # TODO: WIP - Implement this for all types - if data_cls == QuoteTick: + if data_cls == OrderBookDelta: + pyo3_deltas = OrderBookDelta.to_pyo3_list(processed) + batch_bytes = DataTransformer.pyo3_order_book_deltas_to_record_batch_bytes(pyo3_deltas) + elif data_cls == QuoteTick: pyo3_quotes = QuoteTick.to_pyo3_list(processed) - batches_bytes = DataTransformer.pyo3_quote_ticks_to_batch_bytes(pyo3_quotes) + batch_bytes = DataTransformer.pyo3_quote_ticks_to_record_batch_bytes(pyo3_quotes) + elif data_cls == TradeTick: + pyo3_trades = TradeTick.to_pyo3_list(processed) + batch_bytes = DataTransformer.pyo3_trade_ticks_to_record_batch_bytes(pyo3_trades) + elif data_cls == Bar: + pyo3_bars = Bar.to_pyo3_list(processed) + batch_bytes = DataTransformer.pyo3_bars_to_record_batch_bytes(pyo3_bars) else: - batches_bytes = DataTransformer.pyobjects_to_batches_bytes(processed) + batch_bytes = DataTransformer.pyobjects_to_record_batch_bytes(processed) - reader = pa.ipc.open_stream(BytesIO(batches_bytes)) + reader = pa.ipc.open_stream(BytesIO(batch_bytes)) table: pa.Table = reader.read_all() return table diff --git a/tests/unit_tests/model/test_tick.py b/tests/unit_tests/model/test_tick.py index 74539cb5ecfd..041b29a52494 100644 --- a/tests/unit_tests/model/test_tick.py +++ b/tests/unit_tests/model/test_tick.py @@ -15,7 +15,6 @@ import pickle -from nautilus_trader.core import nautilus_pyo3 from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick from nautilus_trader.model.enums import AggressorSide @@ -26,8 +25,6 @@ from nautilus_trader.model.identifiers import Venue from nautilus_trader.model.objects import Price from nautilus_trader.model.objects import Quantity -from nautilus_trader.persistence.wranglers import QuoteTickDataWrangler -from nautilus_trader.test_kit.providers import TestDataProvider from nautilus_trader.test_kit.providers import TestInstrumentProvider @@ -197,22 +194,6 @@ def test_pickling_round_trip_results_in_expected_tick(self): # Assert assert tick == unpickled - def test_to_pyo3_list(self): - # Arrange - wrangler = QuoteTickDataWrangler(instrument=AUDUSD_SIM) - - quotes = wrangler.process( - data=TestDataProvider().read_csv_ticks("truefx/audusd-ticks.csv"), - default_volume=1_000_000, - ) - - # Act - pyo3_quotes = QuoteTick.to_pyo3_list(quotes) - - # Assert - assert len(pyo3_quotes) - assert isinstance(pyo3_quotes[0], nautilus_pyo3.QuoteTick) - class TestTradeTick: def test_fully_qualified_name(self): diff --git a/tests/unit_tests/persistence/test_transformer.py b/tests/unit_tests/persistence/test_transformer.py index b21d9df4be27..4cad3cfe7253 100644 --- a/tests/unit_tests/persistence/test_transformer.py +++ b/tests/unit_tests/persistence/test_transformer.py @@ -45,9 +45,9 @@ def test_pyo3_quote_ticks_to_record_batch_reader() -> None: ticks = wrangler.from_pandas(df) # Act - batches_bytes = DataTransformer.pyo3_quote_ticks_to_batch_bytes(ticks) - batches_stream = BytesIO(batches_bytes) - reader = pa.ipc.open_stream(batches_stream) + batch_bytes = DataTransformer.pyo3_quote_ticks_to_record_batch_bytes(ticks) + batch_stream = BytesIO(batch_bytes) + reader = pa.ipc.open_stream(batch_stream) # Assert assert len(ticks) == 100_000 @@ -62,9 +62,9 @@ def test_legacy_trade_ticks_to_record_batch_reader() -> None: ticks = wrangler.process(provider.read_csv_ticks("binance/ethusdt-trades.csv")) # Act - batches_bytes = DataTransformer.pyobjects_to_batches_bytes(ticks) - batches_stream = BytesIO(batches_bytes) - reader = pa.ipc.open_stream(batches_stream) + batch_bytes = DataTransformer.pyobjects_to_record_batch_bytes(ticks) + batch_stream = BytesIO(batch_bytes) + reader = pa.ipc.open_stream(batch_stream) # Assert assert len(ticks) == 69_806 @@ -95,9 +95,9 @@ def test_legacy_deltas_to_record_batch_reader() -> None: ] # Act - batches_bytes = DataTransformer.pyobjects_to_batches_bytes(ticks) - batches_stream = BytesIO(batches_bytes) - reader = pa.ipc.open_stream(batches_stream) + batch_bytes = DataTransformer.pyobjects_to_record_batch_bytes(ticks) + batch_stream = BytesIO(batch_bytes) + reader = pa.ipc.open_stream(batch_stream) # Assert assert len(ticks) == 1 diff --git a/tests/unit_tests/persistence/test_writing.py b/tests/unit_tests/persistence/test_writing.py index c23733ebe90c..d44f2666714b 100644 --- a/tests/unit_tests/persistence/test_writing.py +++ b/tests/unit_tests/persistence/test_writing.py @@ -44,9 +44,9 @@ def test_legacy_deltas_to_record_batch_reader() -> None: ] # Act - batches_bytes = DataTransformer.pyobjects_to_batches_bytes(ticks) - batches_stream = BytesIO(batches_bytes) - reader = pa.ipc.open_stream(batches_stream) + batch_bytes = DataTransformer.pyobjects_to_record_batch_bytes(ticks) + batch_stream = BytesIO(batch_bytes) + reader = pa.ipc.open_stream(batch_stream) # Assert assert len(ticks) == 1