Skip to content

Commit

Permalink
Continue Databento loading and parsing in Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 14, 2024
1 parent 0acf0bc commit 2be89d7
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 513 deletions.
20 changes: 10 additions & 10 deletions nautilus_core/adapters/src/databento/parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use nautilus_model::{
equity::Equity, futures_contract::FuturesContract, options_contract::OptionsContract,
Instrument,
},
types::{currency::Currency, price::Price, quantity::Quantity},
types::{currency::Currency, fixed::FIXED_SCALAR, price::Price, quantity::Quantity},
};
use ustr::Ustr;

Expand Down Expand Up @@ -276,7 +276,7 @@ pub fn parse_mbo_msg(
let trade = TradeTick::new(
instrument_id,
Price::from_raw(record.price, price_precision)?,
Quantity::from_raw(record.size.into(), 0)?,
Quantity::from_raw(record.size as u64 * FIXED_SCALAR as u64, 0)?,
parse_aggressor_side(record.side),
TradeId::new(itoa::Buffer::new().format(record.sequence))?,
record.ts_recv,
Expand All @@ -288,7 +288,7 @@ pub fn parse_mbo_msg(
let order = BookOrder::new(
side,
Price::from_raw(record.price, price_precision)?,
Quantity::from_raw(record.size.into(), 0)?,
Quantity::from_raw(record.size as u64 * FIXED_SCALAR as u64, 0)?,
record.order_id,
);

Expand All @@ -314,7 +314,7 @@ pub fn parse_trade_msg(
let trade = TradeTick::new(
instrument_id,
Price::from_raw(record.price, price_precision)?,
Quantity::from_raw(record.size.into(), 0)?,
Quantity::from_raw(record.size as u64 * FIXED_SCALAR as u64, 0)?,
parse_aggressor_side(record.side),
TradeId::new(itoa::Buffer::new().format(record.sequence))?,
record.ts_recv,
Expand All @@ -335,8 +335,8 @@ pub fn parse_mbp1_msg(
instrument_id,
Price::from_raw(top_level.bid_px, price_precision)?,
Price::from_raw(top_level.ask_px, price_precision)?,
Quantity::from_raw(top_level.bid_sz.into(), 0)?,
Quantity::from_raw(top_level.ask_sz.into(), 0)?,
Quantity::from_raw(top_level.bid_sz as u64 * FIXED_SCALAR as u64, 0)?,
Quantity::from_raw(top_level.ask_sz as u64 * FIXED_SCALAR as u64, 0)?,
record.ts_recv,
ts_init,
)?;
Expand All @@ -345,7 +345,7 @@ pub fn parse_mbp1_msg(
'T' => Some(TradeTick::new(
instrument_id,
Price::from_raw(record.price, price_precision)?,
Quantity::from_raw(record.size.into(), 0)?,
Quantity::from_raw(record.size as u64 * FIXED_SCALAR as u64, 0)?,
parse_aggressor_side(record.side),
TradeId::new(itoa::Buffer::new().format(record.sequence))?,
record.ts_recv,
Expand All @@ -372,14 +372,14 @@ pub fn parse_mbp10_msg(
let bid_order = BookOrder::new(
OrderSide::Buy,
Price::from_raw(level.bid_px, price_precision)?,
Quantity::from_raw(level.bid_sz.into(), 0)?,
Quantity::from_raw(level.bid_sz as u64 * FIXED_SCALAR as u64, 0)?,
0,
);

let ask_order = BookOrder::new(
OrderSide::Sell,
Price::from_raw(level.ask_px, price_precision)?,
Quantity::from_raw(level.ask_sz.into(), 0)?,
Quantity::from_raw(level.ask_sz as u64 * FIXED_SCALAR as u64, 0)?,
0,
);

Expand Down Expand Up @@ -482,7 +482,7 @@ pub fn parse_ohlcv_msg(
Price::from_raw(record.high / 100, price_precision)?, // TODO(adjust for display factor)
Price::from_raw(record.low / 100, price_precision)?, // TODO(adjust for display factor)
Price::from_raw(record.close / 100, price_precision)?, // TODO(adjust for display factor)
Quantity::from_raw(record.volume, 0)?, // TODO(adjust for display factor)
Quantity::from_raw(record.volume * FIXED_SCALAR as u64, 0)?, // TODO(adjust for display factor)
ts_event,
ts_init,
);
Expand Down
182 changes: 36 additions & 146 deletions nautilus_trader/adapters/databento/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
from pathlib import Path

import databento
import databento_dbn
import msgspec

from nautilus_trader.adapters.databento.common import check_file_path
from nautilus_trader.adapters.databento.parsing import parse_record
from nautilus_trader.adapters.databento.parsing import parse_record_with_metadata
from nautilus_trader.adapters.databento.types import DatabentoPublisher
from nautilus_trader.core import nautilus_pyo3
from nautilus_trader.core.correctness import PyCondition
from nautilus_trader.core.data import Data
from nautilus_trader.model.data import Bar
from nautilus_trader.model.data import OrderBookDelta
from nautilus_trader.model.data import OrderBookDepth10
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.identifiers import Venue
from nautilus_trader.model.instruments import Instrument
Expand Down Expand Up @@ -95,22 +96,6 @@ def publishers(self) -> dict[int, DatabentoPublisher]:
"""
return self._publishers

@property
def instruments(self) -> dict[InstrumentId, Instrument]:
"""
Return the internal Nautilus instruments currently held by the loader.
Returns
-------
dict[InstrumentId, Instrument]
Notes
-----
Returns a copy of the internal dictionary.
"""
return self._instruments

def get_dataset_for_venue(self, venue: Venue) -> str:
"""
Return a dataset for the given `venue`.
Expand Down Expand Up @@ -155,122 +140,11 @@ def load_publishers(self, path: PathLike[str] | str) -> None:
self._publishers = {p.publisher_id: p for p in publishers}
self._venue_dataset: dict[Venue, str] = {Venue(p.venue): p.dataset for p in publishers}

def load_instruments(self, path: PathLike[str] | str) -> None:
"""
Load instrument definitions from the DBN file at the given path.
Parameters
----------
path : PathLike[str] | str
The path for the instruments data to load.
"""
path = Path(path)
check_file_path(path)

instruments = self.from_dbn(path)

PyCondition.not_empty(instruments, "instruments")
PyCondition.type(instruments[0], Instrument, "instruments")

self._instruments = {i.id: i for i in instruments}

def add_instruments(self, instrument: Instrument | list[Instrument]) -> None:
"""
Add the given `instrument`(s) for use by the loader.
Parameters
----------
instrument : Instrument | list[Instrument]
The Nautilus instrument(s) to add.
Warnings
--------
Will overwrite any existing instrument(s) with the same Nautilus instrument ID(s).
"""
if not isinstance(instrument, list):
instruments = [instrument]
else:
instruments = instrument

for inst in instruments:
self._instruments[inst.id] = inst

def from_dbn(
self,
path: PathLike[str] | str,
instrument_id: InstrumentId | None = None,
) -> list[Data]:
"""
Return a list of Nautilus objects decoded from the DBN file at the given `path`.
Parameters
----------
path : PathLike[str] | str
The path for the data.
instrument_id : InstrumentId, optional
The Nautilus instrument ID for the data. This is a parameter to optimize performance,
as all records will have their symbology overridden with the given Nautilus identifier.
Returns
-------
list[Data]
Raises
------
FileNotFoundError
If a non-existent file is specified.
ValueError
If an empty file is specified.
"""
store = databento.from_dbn(path)
instrument_map = databento.InstrumentMap()
instrument_map.insert_metadata(metadata=store.metadata)

output: list[Data] = []

for record in store:
if isinstance(
record,
databento.ErrorMsg
| databento.SystemMsg
| databento.SymbolMappingMsg
| databento_dbn.SymbolMappingMsgV1,
):
continue

if isinstance(record, databento.OHLCVMsg):
ts_init = record.ts_event
else:
ts_init = record.ts_recv

if instrument_id is not None:
data = parse_record(
record=record,
instrument_id=instrument_id,
ts_init=ts_init,
)
else:
data = parse_record_with_metadata(
record=record,
publishers=self._publishers,
instrument_map=instrument_map,
ts_init=ts_init,
)

if isinstance(data, tuple):
output.extend(data)
else:
output.append(data)

return output

def load_from_file_pyo3(
self,
path: PathLike[str] | str,
instrument_id: InstrumentId | None = None,
as_legacy_cython: bool = False,
) -> list[Data]:
"""
Return a list of pyo3 data objects decoded from the DBN file at the given
Expand All @@ -285,6 +159,8 @@ def load_from_file_pyo3(
as all records will have their symbology overridden with the given Nautilus identifier.
This option should only be used if the instrument ID is definitely know (for instance
if all records in a file are guarantted to be for the same instrument).
as_legacy_cython : bool, False
If data should be converted to 'legacy Cython' objects.
Returns
-------
Expand Down Expand Up @@ -313,24 +189,38 @@ def load_from_file_pyo3(

match schema:
case databento.Schema.DEFINITION:
# TODO: pyo3 -> Cython conversion
return self._pyo3_loader.load_instruments(path) # type: ignore
case databento.Schema.MBO:
return self._pyo3_loader.load_order_book_deltas(path, pyo3_instrument_id) # type: ignore
data = self._pyo3_loader.load_order_book_deltas(path, pyo3_instrument_id) # type: ignore
if as_legacy_cython:
data = OrderBookDelta.from_pyo3_list(data)
return data
case databento.Schema.MBP_1 | databento.Schema.TBBO:
return self._pyo3_loader.load_quote_ticks(path, pyo3_instrument_id) # type: ignore
data = self._pyo3_loader.load_quote_ticks(path, pyo3_instrument_id) # type: ignore
if as_legacy_cython:
data = QuoteTick.from_pyo3_list(data)
return data
case databento.Schema.MBP_10:
return self._pyo3_loader.load_order_book_depth10(path) # type: ignore
data = self._pyo3_loader.load_order_book_depth10(path) # type: ignore
if as_legacy_cython:
data = OrderBookDepth10.from_pyo3_list(data)
return data
case databento.Schema.TRADES:
return self._pyo3_loader.load_trade_ticks(path, pyo3_instrument_id) # type: ignore
case databento.Schema.OHLCV_1S:
return self._pyo3_loader.load_bars(path, pyo3_instrument_id) # type: ignore
case databento.Schema.OHLCV_1M:
return self._pyo3_loader.load_bars(path, pyo3_instrument_id) # type: ignore
case databento.Schema.OHLCV_1H:
return self._pyo3_loader.load_bars(path, pyo3_instrument_id) # type: ignore
case databento.Schema.OHLCV_1D:
return self._pyo3_loader.load_bars(path, pyo3_instrument_id) # type: ignore
case databento.Schema.OHLCV_EOD:
return self._pyo3_loader.load_bars(path, pyo3_instrument_id) # type: ignore
data = self._pyo3_loader.load_trade_ticks(path, pyo3_instrument_id) # type: ignore
if as_legacy_cython:
data = TradeTick.from_pyo3_list(data)
return data
case (
databento.Schema.OHLCV_1S
| databento.Schema.OHLCV_1M
| databento.Schema.OHLCV_1H
| databento.Schema.OHLCV_1D
| databento.Schema.OHLCV_EOD
):
data = self._pyo3_loader.load_bars(path, pyo3_instrument_id) # type: ignore
if as_legacy_cython:
data = Bar.from_pyo3_list(data)
return data
case _:
raise RuntimeError(f"Loading schema {schema} not currently supported")
20 changes: 9 additions & 11 deletions nautilus_trader/model/data.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2027,21 +2027,19 @@ cdef class OrderBookDelta(Data):
price_prec = pyo3_delta.order.price.precision
size_prec = pyo3_delta.order.size.precision

book_order = BookOrder(
pyo3_delta.order.side.value,
Price.from_raw_c(pyo3_delta.order.price.raw, price_prec),
Quantity.from_raw_c(pyo3_delta.order.size.raw, size_prec),
pyo3_delta.order.order_id,
)

delta = OrderBookDelta(
delta = OrderBookDelta.from_raw_c(
instrument_id,
pyo3_delta.action.value,
book_order,
pyo3_delta.ts_event,
pyo3_delta.ts_init,
pyo3_delta.order.side.value,
pyo3_delta.order.price.raw,
price_prec,
pyo3_delta.order.size.raw,
size_prec,
pyo3_delta.order.order_id,
pyo3_delta.flags,
pyo3_delta.sequence,
pyo3_delta.ts_event,
pyo3_delta.ts_init,
)
output.append(delta)

Expand Down
Loading

0 comments on commit 2be89d7

Please sign in to comment.