diff --git a/nautilus_core/Cargo.lock b/nautilus_core/Cargo.lock index 0b014c40ca88..40c716b95f8b 100644 --- a/nautilus_core/Cargo.lock +++ b/nautilus_core/Cargo.lock @@ -2446,7 +2446,11 @@ dependencies = [ "rstest", "rust_decimal", "rust_decimal_macros", + "serde", + "serde_json", + "streaming-iterator", "thiserror", + "time", "tokio", "ustr", ] @@ -3702,9 +3706,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.29" +version = "0.38.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a1a81a2478639a14e68937903356dbac62cf52171148924f754bb8a8cd7a96c" +checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" dependencies = [ "bitflags 2.4.1", "errno", diff --git a/nautilus_core/adapters/Cargo.toml b/nautilus_core/adapters/Cargo.toml index 1e6e7f5927a6..1fb2e3087f3b 100644 --- a/nautilus_core/adapters/Cargo.toml +++ b/nautilus_core/adapters/Cargo.toml @@ -22,11 +22,15 @@ pyo3 = { workspace = true, optional = true } rand = { workspace = true } rust_decimal = { workspace = true } rust_decimal_macros = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } thiserror = { workspace = true } ustr = { workspace = true } databento = { version = "0.5.0", optional = true } dbn = { version = "0.14.1", optional = true, features = ["python"] } +streaming-iterator = "0.1.9" +time = "0.3.31" [features] extension-module = [ diff --git a/nautilus_core/adapters/src/databento/loader.rs b/nautilus_core/adapters/src/databento/loader.rs index 97d459d8d1e8..ecf1ccb556e4 100644 --- a/nautilus_core/adapters/src/databento/loader.rs +++ b/nautilus_core/adapters/src/databento/loader.rs @@ -12,3 +12,215 @@ // See the License for the specific language governing permissions and // limitations under the License. // ------------------------------------------------------------------------------------------------- + +use std::{env, fs, path::PathBuf}; + +use anyhow::{bail, Result}; +use databento::dbn; +use dbn::{ + decode::{dbn::Decoder, DecodeDbn}, + Record, +}; +use indexmap::IndexMap; +use nautilus_model::{ + data::Data, + identifiers::{instrument_id::InstrumentId, symbol::Symbol, venue::Venue}, +}; +use pyo3::prelude::*; +use streaming_iterator::StreamingIterator; +use time; +use ustr::Ustr; + +use super::{parsing::parse_record, types::DatabentoPublisher}; + +pub type PublisherId = u16; +pub type Dataset = Ustr; + +/// Provides a Nautilus data loader for Databento Binary Encoding (DBN) format data. +/// +/// # Supported schemas: +/// - MBO -> `OrderBookDelta` +/// - MBP_1 -> `QuoteTick` | `TradeTick` +/// - MBP_10 -> `OrderBookDepth10` +/// - TBBO -> `QuoteTick` | `TradeTick` +/// - TRADES -> `TradeTick` +/// - OHLCV_1S -> `Bar` +/// - OHLCV_1M -> `Bar` +/// - OHLCV_1H -> `Bar` +/// - OHLCV_1D -> `Bar` +/// - DEFINITION -> `Instrument` +/// - IMBALANCE -> `DatabentoImbalance` +/// - STATISTICS -> `DatabentoStatistics` +/// +/// For the loader to work correctly, you must first either: +/// - Load Databento instrument definitions from a DBN file using `load_instruments(...)` +/// - Manually add Nautilus instrument objects through `add_instruments(...)` +/// +/// # Warnings +/// The following Databento instrument classes are not supported: +/// - ``FUTURE_SPREAD`` +/// - ``OPTION_SPEAD`` +/// - ``MIXED_SPREAD`` +/// - ``FX_SPOT`` +/// +/// # References +/// https://docs.databento.com/knowledge-base/new-users/dbn-encoding +#[cfg_attr( + feature = "python", + pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento") +)] +pub struct DatabentoDataLoader { + publishers: IndexMap, + venue_dataset: IndexMap, +} + +impl DatabentoDataLoader { + pub fn new(path: Option) -> Result { + let mut loader = Self { + publishers: IndexMap::new(), + venue_dataset: IndexMap::new(), + }; + + // Load publishers + let publishers_path = match path { + Some(p) => p, + None => { + // Use built-in publishers path + let mut exe_path = env::current_exe()?; + exe_path.pop(); + exe_path.push("publishers.json"); + exe_path + } + }; + + loader.load_publishers(publishers_path)?; + + Ok(loader) + } + + /// Load the publishers data from the file at the given `path`. + pub fn load_publishers(&mut self, path: PathBuf) -> Result<()> { + let file_content = fs::read_to_string(path)?; + let publishers: Vec = serde_json::from_str(&file_content)?; + + self.publishers = publishers + .clone() + .into_iter() + .map(|p| (p.publisher_id, p)) + .collect::>(); + + self.venue_dataset = publishers + .iter() + .map(|p| { + ( + Venue::from(p.venue.as_str()), + Dataset::from(p.dataset.as_str()), + ) + }) + .collect::>(); + + Ok(()) + } + + /// Return the internal Databento publishers currently held by the loader. + pub fn get_publishers(&self) -> &IndexMap { + &self.publishers + } + + // Return the dataset which matches the given `venue` (if found). + pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> { + self.venue_dataset.get(venue) + } + + pub fn get_nautilus_instrument_id_for_record( + &self, + record: &dbn::RecordRef, + metadata: &dbn::Metadata, + ) -> Result { + let (publisher_id, instrument_id, nanoseconds) = match record.rtype()? { + dbn::RType::Mbo => { + let msg = record.get::().unwrap(); // SAFETY: RType known + (msg.hd.publisher_id, msg.hd.instrument_id, msg.ts_recv) + } + dbn::RType::Mbp0 => { + let msg = record.get::().unwrap(); // SAFETY: RType known + (msg.hd.publisher_id, msg.hd.instrument_id, msg.ts_recv) + } + dbn::RType::Mbp1 => { + let msg = record.get::().unwrap(); // SAFETY: RType known + (msg.hd.publisher_id, msg.hd.instrument_id, msg.ts_recv) + } + dbn::RType::Mbp10 => { + let msg = record.get::().unwrap(); // SAFETY: RType known + (msg.hd.publisher_id, msg.hd.instrument_id, msg.ts_recv) + } + dbn::RType::Ohlcv1S + | dbn::RType::Ohlcv1M + | dbn::RType::Ohlcv1H + | dbn::RType::Ohlcv1D + | dbn::RType::OhlcvEod => { + let msg = record.get::().unwrap(); // SAFETY: RType known + (msg.hd.publisher_id, msg.hd.instrument_id, msg.hd.ts_event) + } + _ => bail!("RType is currently unsupported by NautilusTrader"), + }; + + let duration = time::Duration::nanoseconds(nanoseconds as i64); + let datetime = time::OffsetDateTime::UNIX_EPOCH + .checked_add(duration) + .unwrap(); + let date = datetime.date(); + let symbol_map = metadata.symbol_map_for_date(date)?; + let raw_symbol = symbol_map + .get(instrument_id) + .expect("No raw symbol found for {instrument_id}"); + + let symbol = Symbol { + value: Ustr::from(raw_symbol), + }; + let venue_str = self.publishers.get(&publisher_id).unwrap().venue.as_str(); + let venue = Venue { + value: Ustr::from(venue_str), + }; + + Ok(InstrumentId::new(symbol, venue)) + } + + pub fn schema_from_file(&self, path: PathBuf) -> Result { + let decoder = Decoder::from_zstd_file(path)?; + let metadata = decoder.metadata(); + Ok(metadata + .schema + .expect("Mixed schemas are not currently supported.")) + } + + pub fn read_records( + &self, + path: PathBuf, + ) -> Result)>> + '_> + where + T: dbn::Record + dbn::HasRType + 'static, + { + let decoder = Decoder::from_zstd_file(path)?; + let metadata = decoder.metadata().clone(); + let mut dbn_stream = decoder.decode_stream::(); + + Ok(std::iter::from_fn(move || { + dbn_stream.advance(); + match dbn_stream.get() { + Some(record) => { + let rec_ref = dbn::RecordRef::from(record); + let instrument_id = self + .get_nautilus_instrument_id_for_record(&rec_ref, &metadata) + .unwrap(); + + match parse_record(&rec_ref, instrument_id, 0) { + Ok(data) => Some(Ok(data)), + Err(e) => Some(Err(e)), + } + } + None => None, + } + })) + } +} diff --git a/nautilus_core/adapters/src/databento/mod.rs b/nautilus_core/adapters/src/databento/mod.rs index 992e37bfec44..0f1c28d8944c 100644 --- a/nautilus_core/adapters/src/databento/mod.rs +++ b/nautilus_core/adapters/src/databento/mod.rs @@ -14,7 +14,6 @@ // ------------------------------------------------------------------------------------------------- pub mod common; -pub mod historical; pub mod loader; pub mod parsing; pub mod types; diff --git a/nautilus_core/adapters/src/databento/parsing.rs b/nautilus_core/adapters/src/databento/parsing.rs index b9ef037673f7..d22c791495ac 100644 --- a/nautilus_core/adapters/src/databento/parsing.rs +++ b/nautilus_core/adapters/src/databento/parsing.rs @@ -22,6 +22,7 @@ use std::{ use anyhow::{anyhow, bail, Result}; use databento::dbn; +use dbn::Record; use itoa; use nautilus_core::{datetime::NANOSECONDS_IN_SECOND, time::UnixNanos}; use nautilus_model::{ @@ -490,33 +491,14 @@ pub fn parse_ohlcv_msg( Ok(bar) } -// pub fn parse_record_with_metadata( -// record: T, -// publishers: IndexMap, -// ts_init: UnixNanos, -// ) -> Result<(Data, Option)> { -// let publisher_id: PublisherId = record.header().publisher_id; -// let publisher = publishers -// .get(&record.header().publisher_id) -// .ok_or_else(|| anyhow!("Publisher ID {publisher_id} not found in map"))?; -// -// let raw_symbol = unsafe { parse_raw_ptr_to_ustr(record.raw_symbol.as_ptr())? }; -// let instrument_id = nautilus_instrument_id_from_databento(raw_symbol, publisher); -// } - -pub fn parse_record( - record: T, +pub fn parse_record( + record: &dbn::RecordRef, instrument_id: InstrumentId, ts_init: UnixNanos, -) -> Result<(Data, Option)> -where - T: dbn::Record + dbn::HasRType, -{ - let record_ref = dbn::RecordRef::from(&record); - +) -> Result<(Data, Option)> { let result = match record.rtype()? { dbn::RType::Mbo => { - let msg = record_ref.get::().unwrap(); // SAFETY: RType known + let msg = record.get::().unwrap(); // SAFETY: RType known let result = parse_mbo_msg(msg, instrument_id, 2, ts_init)?; match result { (Some(delta), None) => (Data::Delta(delta), None), @@ -525,12 +507,12 @@ where } } dbn::RType::Mbp0 => { - let msg = record_ref.get::().unwrap(); // SAFETY: RType known + let msg = record.get::().unwrap(); // SAFETY: RType known let trade = parse_trade_msg(msg, instrument_id, 2, ts_init)?; (Data::Trade(trade), None) } dbn::RType::Mbp1 => { - let msg = record_ref.get::().unwrap(); // SAFETY: RType known + let msg = record.get::().unwrap(); // SAFETY: RType known let result = parse_mbp1_msg(msg, instrument_id, 2, ts_init)?; match result { (quote, None) => (Data::Quote(quote), None), @@ -538,7 +520,7 @@ where } } dbn::RType::Mbp10 => { - let msg = record_ref.get::().unwrap(); // SAFETY: RType known + let msg = record.get::().unwrap(); // SAFETY: RType known let depth = parse_mbp10_msg(msg, instrument_id, 2, ts_init)?; (Data::Depth10(depth), None) } @@ -547,7 +529,7 @@ where | dbn::RType::Ohlcv1H | dbn::RType::Ohlcv1D | dbn::RType::OhlcvEod => { - let msg = record_ref.get::().unwrap(); // SAFETY: RType known + let msg = record.get::().unwrap(); // SAFETY: RType known let bar = parse_ohlcv_msg(msg, instrument_id, 2, ts_init)?; (Data::Bar(bar), None) } diff --git a/nautilus_core/adapters/src/databento/historical.rs b/nautilus_core/adapters/src/databento/python/loader.rs similarity index 71% rename from nautilus_core/adapters/src/databento/historical.rs rename to nautilus_core/adapters/src/databento/python/loader.rs index 97d459d8d1e8..27cff1aed8f3 100644 --- a/nautilus_core/adapters/src/databento/historical.rs +++ b/nautilus_core/adapters/src/databento/python/loader.rs @@ -12,3 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. // ------------------------------------------------------------------------------------------------- + +use std::path::PathBuf; + +use nautilus_core::python::to_pyvalue_err; +use pyo3::prelude::*; + +use crate::databento::loader::DatabentoDataLoader; + +#[pymethods] +impl DatabentoDataLoader { + #[new] + pub fn py_new(path: Option) -> PyResult { + DatabentoDataLoader::new(path.map(PathBuf::from)).map_err(to_pyvalue_err) + } +} diff --git a/nautilus_core/adapters/src/databento/python/mod.rs b/nautilus_core/adapters/src/databento/python/mod.rs index 00563cd1c676..7cca0dcab793 100644 --- a/nautilus_core/adapters/src/databento/python/mod.rs +++ b/nautilus_core/adapters/src/databento/python/mod.rs @@ -13,4 +13,5 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- +pub mod loader; pub mod parsing; diff --git a/nautilus_core/adapters/src/databento/types.rs b/nautilus_core/adapters/src/databento/types.rs index fd6bbff3b34a..70f8307a0bd6 100644 --- a/nautilus_core/adapters/src/databento/types.rs +++ b/nautilus_core/adapters/src/databento/types.rs @@ -14,10 +14,12 @@ // ------------------------------------------------------------------------------------------------- use databento::dbn; +use serde::Deserialize; /// Represents a Databento publisher ID. pub type PublisherId = u16; +#[derive(Clone, Debug, PartialEq, Eq, Hash, Deserialize)] pub struct DatabentoPublisher { pub publisher_id: PublisherId, pub dataset: dbn::Dataset, diff --git a/nautilus_trader/adapters/databento/loaders.py b/nautilus_trader/adapters/databento/loaders.py index dc5251097507..b0af7594bf0a 100644 --- a/nautilus_trader/adapters/databento/loaders.py +++ b/nautilus_trader/adapters/databento/loaders.py @@ -38,7 +38,7 @@ class DatabentoDataLoader: Supported schemas: - MBO -> `OrderBookDelta` - MBP_1 -> `QuoteTick` | `TradeTick` - - MBP_10 -> `OrderBookDeltas` (as snapshots) + - MBP_10 -> `OrderBookDepth10` - TBBO -> `QuoteTick` | `TradeTick` - TRADES -> `TradeTick` - OHLCV_1S -> `Bar` @@ -55,7 +55,7 @@ class DatabentoDataLoader: Warnings -------- - The following Databento instrument classes are not supported: + The following Databento instrument classes are not currently supported: - ``FUTURE_SPREAD`` - ``OPTION_SPEAD`` - ``MIXED_SPREAD`` @@ -105,27 +105,6 @@ def instruments(self) -> dict[InstrumentId, Instrument]: """ return self._instruments - def get_venue_for_dataset(self, dataset: str) -> Venue: - """ - Return a venue for the given `dataset`. - - Parameters - ---------- - dataset : str - The dataset for the venue. - - Returns - ------- - Venue - - Raises - ------ - KeyError - If `dataset` is not in the map of publishers. - - """ - return self._dataset_venue[dataset] - def get_dataset_for_venue(self, venue: Venue) -> str: """ Return a dataset for the given `venue`. @@ -168,7 +147,6 @@ def load_publishers(self, path: PathLike[str] | str) -> None: publishers: list[DatabentoPublisher] = decoder.decode(path.read_bytes()) self._publishers = {p.publisher_id: p for p in publishers} - self._dataset_venue: dict[str, Venue] = {p.dataset: Venue(p.venue) for p in publishers} self._venue_dataset: dict[Venue, str] = {Venue(p.venue): p.dataset for p in publishers} def load_instruments(self, path: PathLike[str] | str) -> None: