Skip to content

Commit

Permalink
Add OrderBookDepth10 to ParquetDataCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 5, 2024
1 parent d40ba1e commit f1e80d2
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 37 deletions.
14 changes: 10 additions & 4 deletions nautilus_core/persistence/src/python/backend/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
// -------------------------------------------------------------------------------------------------

use nautilus_core::{ffi::cvec::CVec, python::to_pyruntime_err};
use nautilus_model::data::{bar::Bar, delta::OrderBookDelta, quote::QuoteTick, trade::TradeTick};
use nautilus_model::data::{
bar::Bar, delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
};
use pyo3::{prelude::*, types::PyCapsule};

use crate::backend::session::{DataBackendSession, DataQueryResult};
Expand All @@ -25,9 +27,10 @@ use crate::backend::session::{DataBackendSession, DataQueryResult};
pub enum NautilusDataType {
// Custom = 0, # First slot reserved for custom data
OrderBookDelta = 1,
QuoteTick = 2,
TradeTick = 3,
Bar = 4,
OrderBookDepth10 = 2,
QuoteTick = 3,
TradeTick = 4,
Bar = 5,
}

#[pymethods]
Expand Down Expand Up @@ -65,6 +68,9 @@ impl DataBackendSession {
NautilusDataType::OrderBookDelta => slf
.add_file::<OrderBookDelta>(table_name, file_path, sql_query)
.map_err(to_pyruntime_err),
NautilusDataType::OrderBookDepth10 => slf
.add_file::<OrderBookDepth10>(table_name, file_path, sql_query)
.map_err(to_pyruntime_err),
NautilusDataType::QuoteTick => slf
.add_file::<QuoteTick>(table_name, file_path, sql_query)
.map_err(to_pyruntime_err),
Expand Down
7 changes: 4 additions & 3 deletions nautilus_trader/core/nautilus_pyo3.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1251,9 +1251,10 @@ class SocketConfig:

class NautilusDataType(Enum):
OrderBookDelta = 1
QuoteTick = 2
TradeTick = 3
Bar = 4
OrderBookDepth10 = 2
QuoteTick = 3
TradeTick = 4
Bar = 5

class DataBackendSession:
def __init__(self, chunk_size: int = 5000) -> None: ...
Expand Down
12 changes: 8 additions & 4 deletions nautilus_trader/persistence/catalog/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from nautilus_trader.model.data import GenericData
from nautilus_trader.model.data import OrderBookDelta
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import OrderBookDepth10
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.data import capsule_to_list
Expand Down Expand Up @@ -351,7 +352,7 @@ def query(
where: str | None = None,
**kwargs: Any,
) -> list[Data | GenericData]:
if data_cls in (OrderBookDelta, QuoteTick, TradeTick, Bar):
if data_cls in (OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick, Bar):
data = self.query_rust(
data_cls=data_cls,
instrument_ids=instrument_ids,
Expand Down Expand Up @@ -542,14 +543,16 @@ def _build_query(
def _nautilus_data_cls_to_data_type(data_cls: type) -> NautilusDataType:
if data_cls in (OrderBookDelta, OrderBookDeltas):
return NautilusDataType.OrderBookDelta
elif data_cls == OrderBookDepth10:
return NautilusDataType.OrderBookDepth10
elif data_cls == QuoteTick:
return NautilusDataType.QuoteTick
elif data_cls == TradeTick:
return NautilusDataType.TradeTick
elif data_cls == Bar:
return NautilusDataType.Bar
else:
raise RuntimeError("unsupported `data_cls` for Rust parquet, was {data_cls.__name__}")
raise RuntimeError(f"unsupported `data_cls` for Rust parquet, was {data_cls.__name__}")

@staticmethod
def _handle_table_nautilus(
Expand All @@ -563,10 +566,11 @@ def _handle_table_nautilus(
module = data[0].__class__.__module__
if "builtins" in module:
cython_cls = {
"OrderBookDeltas": OrderBookDelta,
"OrderBookDelta": OrderBookDelta,
"TradeTick": TradeTick,
"OrderBookDeltas": OrderBookDelta,
"OrderBookDepth10": OrderBookDepth10,
"QuoteTick": QuoteTick,
"TradeTick": TradeTick,
"Bar": Bar,
}.get(data_cls.__name__, data_cls.__name__)
data = cython_cls.from_pyo3(data)
Expand Down
6 changes: 3 additions & 3 deletions tests/unit_tests/persistence/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
from nautilus_trader.adapters.betfair.parsing.core import betting_instruments_from_file
from nautilus_trader.adapters.betfair.parsing.core import parse_betfair_file
from nautilus_trader.persistence.catalog.parquet import ParquetDataCatalog
from nautilus_trader.test_kit.mocks.data import data_catalog_setup
from nautilus_trader.test_kit.mocks.data import setup_catalog
from tests import TEST_DATA_DIR


@pytest.fixture(name="memory_data_catalog")
def fixture_memory_data_catalog() -> ParquetDataCatalog:
return data_catalog_setup(protocol="memory")
return setup_catalog(protocol="memory")


@pytest.fixture(name="data_catalog")
def fixture_data_catalog() -> ParquetDataCatalog:
return data_catalog_setup(protocol="file")
return setup_catalog(protocol="file")


@pytest.fixture(name="betfair_catalog")
Expand Down
37 changes: 19 additions & 18 deletions tests/unit_tests/persistence/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,21 @@
# limitations under the License.
# -------------------------------------------------------------------------------------------------

import os
from pathlib import Path

import pandas as pd

from nautilus_trader import PACKAGE_ROOT
from nautilus_trader.core.nautilus_pyo3 import DataBackendSession
from nautilus_trader.core.nautilus_pyo3 import NautilusDataType
from nautilus_trader.model.data import capsule_to_list
from tests import TEST_DATA_DIR


def test_backend_session_order_book() -> None:
def test_backend_session_order_book_deltas() -> None:
# Arrange
parquet_data_path = os.path.join(PACKAGE_ROOT, "tests/test_data/order_book_deltas.parquet")
assert pd.read_parquet(parquet_data_path).shape[0] == 1077
data_path = Path(TEST_DATA_DIR) / "order_book_deltas.parquet"
session = DataBackendSession()
session.add_file(NautilusDataType.OrderBookDelta, "order_book_deltas", parquet_data_path)
session.add_file(NautilusDataType.OrderBookDelta, "order_book_deltas", str(data_path))

# Act
result = session.to_query_result()
Expand All @@ -38,16 +37,17 @@ def test_backend_session_order_book() -> None:
ticks.extend(capsule_to_list(chunk))

# Assert
assert pd.read_parquet(data_path).shape[0] == 1077
assert len(ticks) == 1077
is_ascending = all(ticks[i].ts_init <= ticks[i].ts_init for i in range(len(ticks) - 1))
assert is_ascending


def test_backend_session_quotes() -> None:
# Arrange
parquet_data_path = os.path.join(PACKAGE_ROOT, "tests/test_data/quote_tick_data.parquet")
data_path = Path(TEST_DATA_DIR) / "quote_tick_data.parquet"
session = DataBackendSession()
session.add_file(NautilusDataType.QuoteTick, "quote_ticks", parquet_data_path)
session.add_file(NautilusDataType.QuoteTick, "quote_ticks", str(data_path))

# Act
result = session.to_query_result()
Expand All @@ -57,17 +57,17 @@ def test_backend_session_quotes() -> None:
ticks.extend(capsule_to_list(chunk))

# Assert
assert len(ticks) == 9500
assert len(ticks) == 9_500
assert str(ticks[-1]) == "EUR/USD.SIM,1.12130,1.12132,0,0,1577919652000000125"
is_ascending = all(ticks[i].ts_init <= ticks[i].ts_init for i in range(len(ticks) - 1))
assert is_ascending


def test_backend_session_trades() -> None:
# Arrange
trades_path = os.path.join(PACKAGE_ROOT, "tests/test_data/trade_tick_data.parquet")
data_path = Path(TEST_DATA_DIR) / "trade_tick_data.parquet"
session = DataBackendSession()
session.add_file(NautilusDataType.TradeTick, "trade_ticks", trades_path)
session.add_file(NautilusDataType.TradeTick, "trade_ticks", str(data_path))

# Act
result = session.to_query_result()
Expand All @@ -84,9 +84,9 @@ def test_backend_session_trades() -> None:

def test_backend_session_bars() -> None:
# Arrange
trades_path = os.path.join(PACKAGE_ROOT, "tests/test_data/bar_data.parquet")
data_path = Path(TEST_DATA_DIR) / "bar_data.parquet"
session = DataBackendSession()
session.add_file(NautilusDataType.Bar, "bars_01", trades_path)
session.add_file(NautilusDataType.Bar, "bars_01", str(data_path))

# Act
result = session.to_query_result()
Expand All @@ -103,11 +103,12 @@ def test_backend_session_bars() -> None:

def test_backend_session_multiple_types() -> None:
# Arrange
trades_path = os.path.join(PACKAGE_ROOT, "tests/test_data/trade_tick_data.parquet")
quotes_path = os.path.join(PACKAGE_ROOT, "tests/test_data/quote_tick_data.parquet")
trades_path = Path(TEST_DATA_DIR) / "trade_tick_data.parquet"
quotes_path = Path(TEST_DATA_DIR) / "quote_tick_data.parquet"

session = DataBackendSession()
session.add_file(NautilusDataType.TradeTick, "trades_01", trades_path)
session.add_file(NautilusDataType.QuoteTick, "quotes_01", quotes_path)
session.add_file(NautilusDataType.TradeTick, "trades_01", str(trades_path))
session.add_file(NautilusDataType.QuoteTick, "quotes_01", str(quotes_path))

# Act
result = session.to_query_result()
Expand All @@ -117,6 +118,6 @@ def test_backend_session_multiple_types() -> None:
ticks.extend(capsule_to_list(chunk))

# Assert
assert len(ticks) == 9600
assert len(ticks) == 9_600
is_ascending = all(ticks[i].ts_init <= ticks[i].ts_init for i in range(len(ticks) - 1))
assert is_ascending
7 changes: 2 additions & 5 deletions tests/unit_tests/persistence/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import datetime
import sys
from decimal import Decimal
from typing import ClassVar

import fsspec
import pyarrow.dataset as ds
Expand All @@ -38,17 +37,15 @@
from nautilus_trader.model.objects import Quantity
from nautilus_trader.persistence.catalog.parquet import ParquetDataCatalog
from nautilus_trader.test_kit.mocks.data import NewsEventData
from nautilus_trader.test_kit.mocks.data import data_catalog_setup
from nautilus_trader.test_kit.mocks.data import setup_catalog
from nautilus_trader.test_kit.providers import TestInstrumentProvider
from nautilus_trader.test_kit.stubs.data import TestDataStubs
from nautilus_trader.test_kit.stubs.persistence import TestPersistenceStubs


class TestPersistenceCatalog:
FS_PROTOCOL: ClassVar["str"] = "file"

def setup(self) -> None:
self.catalog = data_catalog_setup(protocol=self.FS_PROTOCOL)
self.catalog = setup_catalog(protocol="file")
self.fs: fsspec.AbstractFileSystem = self.catalog.fs

def test_list_data_types(self, betfair_catalog: ParquetDataCatalog) -> None:
Expand Down

0 comments on commit f1e80d2

Please sign in to comment.