Skip to content

Commit

Permalink
Continue Databento loading and parsing in Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 13, 2024
1 parent 634ab88 commit 24e14b8
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 54 deletions.
8 changes: 6 additions & 2 deletions nautilus_core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions nautilus_core/adapters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ pyo3 = { workspace = true, optional = true }
rand = { workspace = true }
rust_decimal = { workspace = true }
rust_decimal_macros = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
thiserror = { workspace = true }
ustr = { workspace = true }
databento = { version = "0.5.0", optional = true }
dbn = { version = "0.14.1", optional = true, features = ["python"] }
streaming-iterator = "0.1.9"
time = "0.3.31"

[features]
extension-module = [
Expand Down
212 changes: 212 additions & 0 deletions nautilus_core/adapters/src/databento/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,215 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::{env, fs, path::PathBuf};

use anyhow::{bail, Result};
use databento::dbn;
use dbn::{
decode::{dbn::Decoder, DecodeDbn},
Record,
};
use indexmap::IndexMap;
use nautilus_model::{
data::Data,
identifiers::{instrument_id::InstrumentId, symbol::Symbol, venue::Venue},
};
use pyo3::prelude::*;
use streaming_iterator::StreamingIterator;
use time;
use ustr::Ustr;

use super::{parsing::parse_record, types::DatabentoPublisher};

pub type PublisherId = u16;
pub type Dataset = Ustr;

/// Provides a Nautilus data loader for Databento Binary Encoding (DBN) format data.
///
/// # Supported schemas:
/// - MBO -> `OrderBookDelta`
/// - MBP_1 -> `QuoteTick` | `TradeTick`
/// - MBP_10 -> `OrderBookDepth10`
/// - TBBO -> `QuoteTick` | `TradeTick`
/// - TRADES -> `TradeTick`
/// - OHLCV_1S -> `Bar`
/// - OHLCV_1M -> `Bar`
/// - OHLCV_1H -> `Bar`
/// - OHLCV_1D -> `Bar`
/// - DEFINITION -> `Instrument`
/// - IMBALANCE -> `DatabentoImbalance`
/// - STATISTICS -> `DatabentoStatistics`
///
/// For the loader to work correctly, you must first either:
/// - Load Databento instrument definitions from a DBN file using `load_instruments(...)`
/// - Manually add Nautilus instrument objects through `add_instruments(...)`
///
/// # Warnings
/// The following Databento instrument classes are not supported:
/// - ``FUTURE_SPREAD``
/// - ``OPTION_SPEAD``
/// - ``MIXED_SPREAD``
/// - ``FX_SPOT``
///
/// # References
/// https://docs.databento.com/knowledge-base/new-users/dbn-encoding
#[cfg_attr(
feature = "python",
pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
)]
pub struct DatabentoDataLoader {
publishers: IndexMap<PublisherId, DatabentoPublisher>,
venue_dataset: IndexMap<Venue, Dataset>,
}

impl DatabentoDataLoader {
pub fn new(path: Option<PathBuf>) -> Result<Self> {
let mut loader = Self {
publishers: IndexMap::new(),
venue_dataset: IndexMap::new(),
};

// Load publishers
let publishers_path = match path {
Some(p) => p,
None => {
// Use built-in publishers path
let mut exe_path = env::current_exe()?;
exe_path.pop();
exe_path.push("publishers.json");
exe_path
}
};

loader.load_publishers(publishers_path)?;

Ok(loader)
}

/// Load the publishers data from the file at the given `path`.
pub fn load_publishers(&mut self, path: PathBuf) -> Result<()> {
let file_content = fs::read_to_string(path)?;
let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;

self.publishers = publishers
.clone()
.into_iter()
.map(|p| (p.publisher_id, p))
.collect::<IndexMap<u16, DatabentoPublisher>>();

self.venue_dataset = publishers
.iter()
.map(|p| {
(
Venue::from(p.venue.as_str()),
Dataset::from(p.dataset.as_str()),
)
})
.collect::<IndexMap<Venue, Ustr>>();

Ok(())
}

/// Return the internal Databento publishers currently held by the loader.
pub fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
&self.publishers
}

// Return the dataset which matches the given `venue` (if found).
pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
self.venue_dataset.get(venue)
}

pub fn get_nautilus_instrument_id_for_record(
&self,
record: &dbn::RecordRef,
metadata: &dbn::Metadata,
) -> Result<InstrumentId> {
let (publisher_id, instrument_id, nanoseconds) = match record.rtype()? {
dbn::RType::Mbo => {
let msg = record.get::<dbn::MboMsg>().unwrap(); // SAFETY: RType known
(msg.hd.publisher_id, msg.hd.instrument_id, msg.ts_recv)
}
dbn::RType::Mbp0 => {
let msg = record.get::<dbn::TradeMsg>().unwrap(); // SAFETY: RType known
(msg.hd.publisher_id, msg.hd.instrument_id, msg.ts_recv)
}
dbn::RType::Mbp1 => {
let msg = record.get::<dbn::Mbp1Msg>().unwrap(); // SAFETY: RType known
(msg.hd.publisher_id, msg.hd.instrument_id, msg.ts_recv)
}
dbn::RType::Mbp10 => {
let msg = record.get::<dbn::Mbp10Msg>().unwrap(); // SAFETY: RType known
(msg.hd.publisher_id, msg.hd.instrument_id, msg.ts_recv)
}
dbn::RType::Ohlcv1S
| dbn::RType::Ohlcv1M
| dbn::RType::Ohlcv1H
| dbn::RType::Ohlcv1D
| dbn::RType::OhlcvEod => {
let msg = record.get::<dbn::OhlcvMsg>().unwrap(); // SAFETY: RType known
(msg.hd.publisher_id, msg.hd.instrument_id, msg.hd.ts_event)
}
_ => bail!("RType is currently unsupported by NautilusTrader"),
};

let duration = time::Duration::nanoseconds(nanoseconds as i64);
let datetime = time::OffsetDateTime::UNIX_EPOCH
.checked_add(duration)
.unwrap();
let date = datetime.date();
let symbol_map = metadata.symbol_map_for_date(date)?;
let raw_symbol = symbol_map
.get(instrument_id)
.expect("No raw symbol found for {instrument_id}");

let symbol = Symbol {
value: Ustr::from(raw_symbol),
};
let venue_str = self.publishers.get(&publisher_id).unwrap().venue.as_str();
let venue = Venue {
value: Ustr::from(venue_str),
};

Ok(InstrumentId::new(symbol, venue))
}

pub fn schema_from_file(&self, path: PathBuf) -> Result<dbn::Schema> {
let decoder = Decoder::from_zstd_file(path)?;
let metadata = decoder.metadata();
Ok(metadata
.schema
.expect("Mixed schemas are not currently supported."))
}

pub fn read_records<T>(
&self,
path: PathBuf,
) -> Result<impl Iterator<Item = Result<(Data, Option<Data>)>> + '_>
where
T: dbn::Record + dbn::HasRType + 'static,
{
let decoder = Decoder::from_zstd_file(path)?;
let metadata = decoder.metadata().clone();
let mut dbn_stream = decoder.decode_stream::<T>();

Ok(std::iter::from_fn(move || {
dbn_stream.advance();
match dbn_stream.get() {
Some(record) => {
let rec_ref = dbn::RecordRef::from(record);
let instrument_id = self
.get_nautilus_instrument_id_for_record(&rec_ref, &metadata)
.unwrap();

match parse_record(&rec_ref, instrument_id, 0) {
Ok(data) => Some(Ok(data)),
Err(e) => Some(Err(e)),
}
}
None => None,
}
}))
}
}
1 change: 0 additions & 1 deletion nautilus_core/adapters/src/databento/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// -------------------------------------------------------------------------------------------------

pub mod common;
pub mod historical;
pub mod loader;
pub mod parsing;
pub mod types;
Expand Down
36 changes: 9 additions & 27 deletions nautilus_core/adapters/src/databento/parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{

use anyhow::{anyhow, bail, Result};
use databento::dbn;
use dbn::Record;
use itoa;
use nautilus_core::{datetime::NANOSECONDS_IN_SECOND, time::UnixNanos};
use nautilus_model::{
Expand Down Expand Up @@ -490,33 +491,14 @@ pub fn parse_ohlcv_msg(
Ok(bar)
}

// pub fn parse_record_with_metadata<T>(
// record: T,
// publishers: IndexMap<PublisherId, DatabentoPublisher>,
// ts_init: UnixNanos,
// ) -> Result<(Data, Option<Data>)> {
// let publisher_id: PublisherId = record.header().publisher_id;
// let publisher = publishers
// .get(&record.header().publisher_id)
// .ok_or_else(|| anyhow!("Publisher ID {publisher_id} not found in map"))?;
//
// let raw_symbol = unsafe { parse_raw_ptr_to_ustr(record.raw_symbol.as_ptr())? };
// let instrument_id = nautilus_instrument_id_from_databento(raw_symbol, publisher);
// }

pub fn parse_record<T>(
record: T,
pub fn parse_record(
record: &dbn::RecordRef,
instrument_id: InstrumentId,
ts_init: UnixNanos,
) -> Result<(Data, Option<Data>)>
where
T: dbn::Record + dbn::HasRType,
{
let record_ref = dbn::RecordRef::from(&record);

) -> Result<(Data, Option<Data>)> {
let result = match record.rtype()? {
dbn::RType::Mbo => {
let msg = record_ref.get::<dbn::MboMsg>().unwrap(); // SAFETY: RType known
let msg = record.get::<dbn::MboMsg>().unwrap(); // SAFETY: RType known
let result = parse_mbo_msg(msg, instrument_id, 2, ts_init)?;
match result {
(Some(delta), None) => (Data::Delta(delta), None),
Expand All @@ -525,20 +507,20 @@ where
}
}
dbn::RType::Mbp0 => {
let msg = record_ref.get::<dbn::TradeMsg>().unwrap(); // SAFETY: RType known
let msg = record.get::<dbn::TradeMsg>().unwrap(); // SAFETY: RType known
let trade = parse_trade_msg(msg, instrument_id, 2, ts_init)?;
(Data::Trade(trade), None)
}
dbn::RType::Mbp1 => {
let msg = record_ref.get::<dbn::Mbp1Msg>().unwrap(); // SAFETY: RType known
let msg = record.get::<dbn::Mbp1Msg>().unwrap(); // SAFETY: RType known
let result = parse_mbp1_msg(msg, instrument_id, 2, ts_init)?;
match result {
(quote, None) => (Data::Quote(quote), None),
(quote, Some(trade)) => (Data::Quote(quote), Some(Data::Trade(trade))),
}
}
dbn::RType::Mbp10 => {
let msg = record_ref.get::<dbn::Mbp10Msg>().unwrap(); // SAFETY: RType known
let msg = record.get::<dbn::Mbp10Msg>().unwrap(); // SAFETY: RType known
let depth = parse_mbp10_msg(msg, instrument_id, 2, ts_init)?;
(Data::Depth10(depth), None)
}
Expand All @@ -547,7 +529,7 @@ where
| dbn::RType::Ohlcv1H
| dbn::RType::Ohlcv1D
| dbn::RType::OhlcvEod => {
let msg = record_ref.get::<dbn::OhlcvMsg>().unwrap(); // SAFETY: RType known
let msg = record.get::<dbn::OhlcvMsg>().unwrap(); // SAFETY: RType known
let bar = parse_ohlcv_msg(msg, instrument_id, 2, ts_init)?;
(Data::Bar(bar), None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::path::PathBuf;

use nautilus_core::python::to_pyvalue_err;
use pyo3::prelude::*;

use crate::databento::loader::DatabentoDataLoader;

#[pymethods]
impl DatabentoDataLoader {
#[new]
pub fn py_new(path: Option<String>) -> PyResult<Self> {
DatabentoDataLoader::new(path.map(PathBuf::from)).map_err(to_pyvalue_err)
}
}
1 change: 1 addition & 0 deletions nautilus_core/adapters/src/databento/python/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

pub mod loader;
pub mod parsing;
2 changes: 2 additions & 0 deletions nautilus_core/adapters/src/databento/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
// -------------------------------------------------------------------------------------------------

use databento::dbn;
use serde::Deserialize;

/// Represents a Databento publisher ID.
pub type PublisherId = u16;

#[derive(Clone, Debug, PartialEq, Eq, Hash, Deserialize)]
pub struct DatabentoPublisher {
pub publisher_id: PublisherId,
pub dataset: dbn::Dataset,
Expand Down
Loading

0 comments on commit 24e14b8

Please sign in to comment.