From f1e80d205599eb2cdbb0de882f0bb9f3925a66eb Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Sat, 6 Jan 2024 10:22:20 +1100 Subject: [PATCH] Add OrderBookDepth10 to ParquetDataCatalog --- .../persistence/src/python/backend/session.rs | 14 +++++-- nautilus_trader/core/nautilus_pyo3.pyi | 7 ++-- .../persistence/catalog/parquet.py | 12 ++++-- tests/unit_tests/persistence/conftest.py | 6 +-- tests/unit_tests/persistence/test_backend.py | 37 ++++++++++--------- tests/unit_tests/persistence/test_catalog.py | 7 +--- 6 files changed, 46 insertions(+), 37 deletions(-) diff --git a/nautilus_core/persistence/src/python/backend/session.rs b/nautilus_core/persistence/src/python/backend/session.rs index 1d637eee822e..27f49f01b05a 100644 --- a/nautilus_core/persistence/src/python/backend/session.rs +++ b/nautilus_core/persistence/src/python/backend/session.rs @@ -14,7 +14,9 @@ // ------------------------------------------------------------------------------------------------- use nautilus_core::{ffi::cvec::CVec, python::to_pyruntime_err}; -use nautilus_model::data::{bar::Bar, delta::OrderBookDelta, quote::QuoteTick, trade::TradeTick}; +use nautilus_model::data::{ + bar::Bar, delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick, +}; use pyo3::{prelude::*, types::PyCapsule}; use crate::backend::session::{DataBackendSession, DataQueryResult}; @@ -25,9 +27,10 @@ use crate::backend::session::{DataBackendSession, DataQueryResult}; pub enum NautilusDataType { // Custom = 0, # First slot reserved for custom data OrderBookDelta = 1, - QuoteTick = 2, - TradeTick = 3, - Bar = 4, + OrderBookDepth10 = 2, + QuoteTick = 3, + TradeTick = 4, + Bar = 5, } #[pymethods] @@ -65,6 +68,9 @@ impl DataBackendSession { NautilusDataType::OrderBookDelta => slf .add_file::(table_name, file_path, sql_query) .map_err(to_pyruntime_err), + NautilusDataType::OrderBookDepth10 => slf + .add_file::(table_name, file_path, sql_query) + .map_err(to_pyruntime_err), NautilusDataType::QuoteTick => slf .add_file::(table_name, file_path, sql_query) .map_err(to_pyruntime_err), diff --git a/nautilus_trader/core/nautilus_pyo3.pyi b/nautilus_trader/core/nautilus_pyo3.pyi index 7afc1a6dbc84..279ee3161741 100644 --- a/nautilus_trader/core/nautilus_pyo3.pyi +++ b/nautilus_trader/core/nautilus_pyo3.pyi @@ -1251,9 +1251,10 @@ class SocketConfig: class NautilusDataType(Enum): OrderBookDelta = 1 - QuoteTick = 2 - TradeTick = 3 - Bar = 4 + OrderBookDepth10 = 2 + QuoteTick = 3 + TradeTick = 4 + Bar = 5 class DataBackendSession: def __init__(self, chunk_size: int = 5000) -> None: ... diff --git a/nautilus_trader/persistence/catalog/parquet.py b/nautilus_trader/persistence/catalog/parquet.py index 085937cf8ad1..0b1ea92105db 100644 --- a/nautilus_trader/persistence/catalog/parquet.py +++ b/nautilus_trader/persistence/catalog/parquet.py @@ -48,6 +48,7 @@ from nautilus_trader.model.data import GenericData from nautilus_trader.model.data import OrderBookDelta from nautilus_trader.model.data import OrderBookDeltas +from nautilus_trader.model.data import OrderBookDepth10 from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick from nautilus_trader.model.data import capsule_to_list @@ -351,7 +352,7 @@ def query( where: str | None = None, **kwargs: Any, ) -> list[Data | GenericData]: - if data_cls in (OrderBookDelta, QuoteTick, TradeTick, Bar): + if data_cls in (OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick, Bar): data = self.query_rust( data_cls=data_cls, instrument_ids=instrument_ids, @@ -542,6 +543,8 @@ def _build_query( def _nautilus_data_cls_to_data_type(data_cls: type) -> NautilusDataType: if data_cls in (OrderBookDelta, OrderBookDeltas): return NautilusDataType.OrderBookDelta + elif data_cls == OrderBookDepth10: + return NautilusDataType.OrderBookDepth10 elif data_cls == QuoteTick: return NautilusDataType.QuoteTick elif data_cls == TradeTick: @@ -549,7 +552,7 @@ def _nautilus_data_cls_to_data_type(data_cls: type) -> NautilusDataType: elif data_cls == Bar: return NautilusDataType.Bar else: - raise RuntimeError("unsupported `data_cls` for Rust parquet, was {data_cls.__name__}") + raise RuntimeError(f"unsupported `data_cls` for Rust parquet, was {data_cls.__name__}") @staticmethod def _handle_table_nautilus( @@ -563,10 +566,11 @@ def _handle_table_nautilus( module = data[0].__class__.__module__ if "builtins" in module: cython_cls = { - "OrderBookDeltas": OrderBookDelta, "OrderBookDelta": OrderBookDelta, - "TradeTick": TradeTick, + "OrderBookDeltas": OrderBookDelta, + "OrderBookDepth10": OrderBookDepth10, "QuoteTick": QuoteTick, + "TradeTick": TradeTick, "Bar": Bar, }.get(data_cls.__name__, data_cls.__name__) data = cython_cls.from_pyo3(data) diff --git a/tests/unit_tests/persistence/conftest.py b/tests/unit_tests/persistence/conftest.py index 4befd879f8c5..064fd05fcdc3 100644 --- a/tests/unit_tests/persistence/conftest.py +++ b/tests/unit_tests/persistence/conftest.py @@ -18,18 +18,18 @@ from nautilus_trader.adapters.betfair.parsing.core import betting_instruments_from_file from nautilus_trader.adapters.betfair.parsing.core import parse_betfair_file from nautilus_trader.persistence.catalog.parquet import ParquetDataCatalog -from nautilus_trader.test_kit.mocks.data import data_catalog_setup +from nautilus_trader.test_kit.mocks.data import setup_catalog from tests import TEST_DATA_DIR @pytest.fixture(name="memory_data_catalog") def fixture_memory_data_catalog() -> ParquetDataCatalog: - return data_catalog_setup(protocol="memory") + return setup_catalog(protocol="memory") @pytest.fixture(name="data_catalog") def fixture_data_catalog() -> ParquetDataCatalog: - return data_catalog_setup(protocol="file") + return setup_catalog(protocol="file") @pytest.fixture(name="betfair_catalog") diff --git a/tests/unit_tests/persistence/test_backend.py b/tests/unit_tests/persistence/test_backend.py index 47b56167bc8c..708fe35f131b 100644 --- a/tests/unit_tests/persistence/test_backend.py +++ b/tests/unit_tests/persistence/test_backend.py @@ -13,22 +13,21 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- -import os +from pathlib import Path import pandas as pd -from nautilus_trader import PACKAGE_ROOT from nautilus_trader.core.nautilus_pyo3 import DataBackendSession from nautilus_trader.core.nautilus_pyo3 import NautilusDataType from nautilus_trader.model.data import capsule_to_list +from tests import TEST_DATA_DIR -def test_backend_session_order_book() -> None: +def test_backend_session_order_book_deltas() -> None: # Arrange - parquet_data_path = os.path.join(PACKAGE_ROOT, "tests/test_data/order_book_deltas.parquet") - assert pd.read_parquet(parquet_data_path).shape[0] == 1077 + data_path = Path(TEST_DATA_DIR) / "order_book_deltas.parquet" session = DataBackendSession() - session.add_file(NautilusDataType.OrderBookDelta, "order_book_deltas", parquet_data_path) + session.add_file(NautilusDataType.OrderBookDelta, "order_book_deltas", str(data_path)) # Act result = session.to_query_result() @@ -38,6 +37,7 @@ def test_backend_session_order_book() -> None: ticks.extend(capsule_to_list(chunk)) # Assert + assert pd.read_parquet(data_path).shape[0] == 1077 assert len(ticks) == 1077 is_ascending = all(ticks[i].ts_init <= ticks[i].ts_init for i in range(len(ticks) - 1)) assert is_ascending @@ -45,9 +45,9 @@ def test_backend_session_order_book() -> None: def test_backend_session_quotes() -> None: # Arrange - parquet_data_path = os.path.join(PACKAGE_ROOT, "tests/test_data/quote_tick_data.parquet") + data_path = Path(TEST_DATA_DIR) / "quote_tick_data.parquet" session = DataBackendSession() - session.add_file(NautilusDataType.QuoteTick, "quote_ticks", parquet_data_path) + session.add_file(NautilusDataType.QuoteTick, "quote_ticks", str(data_path)) # Act result = session.to_query_result() @@ -57,7 +57,7 @@ def test_backend_session_quotes() -> None: ticks.extend(capsule_to_list(chunk)) # Assert - assert len(ticks) == 9500 + assert len(ticks) == 9_500 assert str(ticks[-1]) == "EUR/USD.SIM,1.12130,1.12132,0,0,1577919652000000125" is_ascending = all(ticks[i].ts_init <= ticks[i].ts_init for i in range(len(ticks) - 1)) assert is_ascending @@ -65,9 +65,9 @@ def test_backend_session_quotes() -> None: def test_backend_session_trades() -> None: # Arrange - trades_path = os.path.join(PACKAGE_ROOT, "tests/test_data/trade_tick_data.parquet") + data_path = Path(TEST_DATA_DIR) / "trade_tick_data.parquet" session = DataBackendSession() - session.add_file(NautilusDataType.TradeTick, "trade_ticks", trades_path) + session.add_file(NautilusDataType.TradeTick, "trade_ticks", str(data_path)) # Act result = session.to_query_result() @@ -84,9 +84,9 @@ def test_backend_session_trades() -> None: def test_backend_session_bars() -> None: # Arrange - trades_path = os.path.join(PACKAGE_ROOT, "tests/test_data/bar_data.parquet") + data_path = Path(TEST_DATA_DIR) / "bar_data.parquet" session = DataBackendSession() - session.add_file(NautilusDataType.Bar, "bars_01", trades_path) + session.add_file(NautilusDataType.Bar, "bars_01", str(data_path)) # Act result = session.to_query_result() @@ -103,11 +103,12 @@ def test_backend_session_bars() -> None: def test_backend_session_multiple_types() -> None: # Arrange - trades_path = os.path.join(PACKAGE_ROOT, "tests/test_data/trade_tick_data.parquet") - quotes_path = os.path.join(PACKAGE_ROOT, "tests/test_data/quote_tick_data.parquet") + trades_path = Path(TEST_DATA_DIR) / "trade_tick_data.parquet" + quotes_path = Path(TEST_DATA_DIR) / "quote_tick_data.parquet" + session = DataBackendSession() - session.add_file(NautilusDataType.TradeTick, "trades_01", trades_path) - session.add_file(NautilusDataType.QuoteTick, "quotes_01", quotes_path) + session.add_file(NautilusDataType.TradeTick, "trades_01", str(trades_path)) + session.add_file(NautilusDataType.QuoteTick, "quotes_01", str(quotes_path)) # Act result = session.to_query_result() @@ -117,6 +118,6 @@ def test_backend_session_multiple_types() -> None: ticks.extend(capsule_to_list(chunk)) # Assert - assert len(ticks) == 9600 + assert len(ticks) == 9_600 is_ascending = all(ticks[i].ts_init <= ticks[i].ts_init for i in range(len(ticks) - 1)) assert is_ascending diff --git a/tests/unit_tests/persistence/test_catalog.py b/tests/unit_tests/persistence/test_catalog.py index 126c18b3be0c..e6a565b12826 100644 --- a/tests/unit_tests/persistence/test_catalog.py +++ b/tests/unit_tests/persistence/test_catalog.py @@ -16,7 +16,6 @@ import datetime import sys from decimal import Decimal -from typing import ClassVar import fsspec import pyarrow.dataset as ds @@ -38,17 +37,15 @@ from nautilus_trader.model.objects import Quantity from nautilus_trader.persistence.catalog.parquet import ParquetDataCatalog from nautilus_trader.test_kit.mocks.data import NewsEventData -from nautilus_trader.test_kit.mocks.data import data_catalog_setup +from nautilus_trader.test_kit.mocks.data import setup_catalog from nautilus_trader.test_kit.providers import TestInstrumentProvider from nautilus_trader.test_kit.stubs.data import TestDataStubs from nautilus_trader.test_kit.stubs.persistence import TestPersistenceStubs class TestPersistenceCatalog: - FS_PROTOCOL: ClassVar["str"] = "file" - def setup(self) -> None: - self.catalog = data_catalog_setup(protocol=self.FS_PROTOCOL) + self.catalog = setup_catalog(protocol="file") self.fs: fsspec.AbstractFileSystem = self.catalog.fs def test_list_data_types(self, betfair_catalog: ParquetDataCatalog) -> None: