Skip to content

Commit

Permalink
Continue Databento parsing in Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 1, 2024
1 parent 46f1723 commit ede944e
Showing 1 changed file with 56 additions and 23 deletions.
79 changes: 56 additions & 23 deletions nautilus_core/adapters/src/databento/parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
use anyhow::{anyhow, bail, Result};
use dbn;
use itoa;
use nautilus_core::{datetime::secs_to_nanos, time::UnixNanos};
use nautilus_core::{datetime::NANOSECONDS_IN_SECOND, time::UnixNanos};
use nautilus_model::{
data::{
bar::{Bar, BarSpecification, BarType},
Expand All @@ -48,6 +48,32 @@ use ustr::Ustr;

use super::{common::nautilus_instrument_id_from_databento, types::DatabentoPublisher};

const BAR_SPEC_1S: BarSpecification = BarSpecification {
step: 1,
aggregation: BarAggregation::Second,
price_type: PriceType::Last,
};
const BAR_SPEC_1M: BarSpecification = BarSpecification {
step: 1,
aggregation: BarAggregation::Minute,
price_type: PriceType::Last,
};
const BAR_SPEC_1H: BarSpecification = BarSpecification {
step: 1,
aggregation: BarAggregation::Hour,
price_type: PriceType::Last,
};
const BAR_SPEC_1D: BarSpecification = BarSpecification {
step: 1,
aggregation: BarAggregation::Day,
price_type: PriceType::Last,
};

const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;

pub fn parse_order_side(c: c_char) -> OrderSide {
match c as u8 as char {
'A' => OrderSide::Sell,
Expand Down Expand Up @@ -381,70 +407,68 @@ pub fn parse_mbp10_msg(
}

pub fn parse_bar_type(record: &dbn::OhlcvMsg, instrument_id: InstrumentId) -> Result<BarType> {
match record.hd.rtype {
let bar_type = match record.hd.rtype {
32 => {
// ohlcv-1s
let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
Ok(bar_type)
BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
}
33 => {
// ohlcv-1m
let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Last);
let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
Ok(bar_type)
BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
}
34 => {
// ohlcv-1h
let bar_spec = BarSpecification::new(1, BarAggregation::Hour, PriceType::Last);
let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
Ok(bar_type)
BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
}
35 => {
// ohlcv-1d
let bar_spec = BarSpecification::new(1, BarAggregation::Day, PriceType::Last);
let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
Ok(bar_type)
BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
}
_ => bail!(
"`rtype` is not a supported bar aggregation, was {}",
record.hd.rtype
),
}
};

Ok(bar_type)
}

pub fn parse_ts_event_adjustment(record: &dbn::OhlcvMsg) -> Result<UnixNanos> {
match record.hd.rtype {
let adjustment = match record.hd.rtype {
32 => {
// ohlcv-1s
Ok(secs_to_nanos(1.0))
BAR_CLOSE_ADJUSTMENT_1S
}
33 => {
// ohlcv-1m
Ok(secs_to_nanos(60.0))
BAR_CLOSE_ADJUSTMENT_1M
}
34 => {
// ohlcv-1h
Ok(secs_to_nanos(60.0 * 60.0))
BAR_CLOSE_ADJUSTMENT_1H
}
35 => {
// ohlcv-1d
Ok(secs_to_nanos(60.0 * 60.0 * 24.0))
BAR_CLOSE_ADJUSTMENT_1D
}
_ => bail!(
"`rtype` is not a supported bar aggregation, was {}",
record.hd.rtype
),
}
};

Ok(adjustment)
}

pub fn parse_ohlcv_msg(
record: &dbn::OhlcvMsg,
bar_type: BarType,
instrument_id: InstrumentId,
price_precision: u8,
ts_event_adjustment: UnixNanos,
ts_init: UnixNanos,
) -> Result<Bar> {
let bar_type = parse_bar_type(record, instrument_id)?;
let ts_event_adjustment = parse_ts_event_adjustment(record)?;

// Adjust `ts_event` from open to close of bar
let ts_event = record.hd.ts_event + ts_event_adjustment;
let ts_init = cmp::max(ts_init, ts_event);
Expand Down Expand Up @@ -510,6 +534,15 @@ where
(quote, Some(trade)) => (Data::Quote(quote), Some(Data::Trade(trade))),
}
}
dbn::RType::Ohlcv1S
| dbn::RType::Ohlcv1M
| dbn::RType::Ohlcv1H
| dbn::RType::Ohlcv1D
| dbn::RType::OhlcvEod => {
let msg = record_ref.get::<dbn::OhlcvMsg>().unwrap(); // SAFETY: RType known
let bar = parse_ohlcv_msg(msg, instrument_id, 2, ts_init)?;
(Data::Bar(bar), None)
}
// dbn::RType::Mbp10 => {
// let msg = record_ref.get::<dbn::Mbp10Msg>().unwrap(); // SAFETY: RType known
// let trade = parse_mbp10_msg(msg, instrument_id, 2, ts_init)?;
Expand Down

0 comments on commit ede944e

Please sign in to comment.