Skip to content

Commit

Permalink
Cleanup Databento MBO buffering
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Feb 23, 2024
1 parent 06dc861 commit 9816f58
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 19 deletions.
16 changes: 5 additions & 11 deletions nautilus_core/adapters/src/databento/python/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ use dbn::{PitSymbolMap, Record, SymbolIndex, VersionUpgradePolicy};
use indexmap::IndexMap;
use log::{error, info};
use nautilus_core::{
ffi::cvec::CVec,
python::{to_pyruntime_err, to_pyvalue_err},
time::{get_atomic_clock_realtime, AtomicTime, UnixNanos},
};
use nautilus_model::{
data::{delta::OrderBookDelta, deltas::OrderBookDeltas, Data},
ffi::data::deltas::orderbook_deltas_new,
ffi::data::deltas::OrderBookDeltas_API,
identifiers::{instrument_id::InstrumentId, symbol::Symbol, venue::Venue},
python::data::data_to_pycapsule,
};
Expand Down Expand Up @@ -237,14 +236,11 @@ impl DatabentoLiveClient {
let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
buffer.push(delta);

// TODO: Temporary for debugging
deltas_count += 1;
println!(
"Buffering delta: {} {} {:?} flags={}, buffer_len={}",
deltas_count,
delta.ts_event,
buffering_start,
msg.flags,
buffer.len()
"Buffering delta: {} {} {:?} flags={}",
deltas_count, delta.ts_event, buffering_start, msg.flags,
);

// Check if last message in the packet
Expand All @@ -266,11 +262,9 @@ impl DatabentoLiveClient {
}

// SAFETY: We can guarantee a deltas vec exists
let instrument_id = delta.instrument_id;
let buffer = buffered_deltas.remove(&delta.instrument_id).unwrap();
let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
let deltas_cvec: CVec = deltas.deltas.into();
let deltas = orderbook_deltas_new(instrument_id, &deltas_cvec);
let deltas = OrderBookDeltas_API::new(deltas);
data1 = Some(Data::Deltas(deltas));
}
};
Expand Down
8 changes: 0 additions & 8 deletions nautilus_trader/adapters/databento/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
from nautilus_trader.model.data import Bar
from nautilus_trader.model.data import BarType
from nautilus_trader.model.data import DataType
from nautilus_trader.model.data import OrderBookDelta
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.data import capsule_to_data
Expand Down Expand Up @@ -137,8 +136,6 @@ def __init__(
self._buffer_mbo_subscriptions_task: asyncio.Task | None = None
self._is_buffering_mbo_subscriptions: bool = bool(config.mbo_subscriptions_delay)
self._buffered_mbo_subscriptions: dict[Dataset, list[InstrumentId]] = defaultdict(list)
self._buffered_deltas: dict[InstrumentId, list[OrderBookDelta]] = defaultdict(list)
self._buffering_replay: dict[InstrumentId, int] = {}

# Tasks
self._live_client_futures: set[asyncio.Future] = set()
Expand Down Expand Up @@ -492,11 +489,6 @@ async def _subscribe_order_book_deltas_batch(
ids_str = ",".join([i.value for i in instrument_ids])
self._log.info(f"Subscribing to MBO/L3 for {ids_str}.", LogColor.BLUE)

# Setup buffered start times
now = self._clock.utc_now()
for instrument_id in instrument_ids:
self._buffering_replay[instrument_id] = now.value

dataset: Dataset = self._loader.get_dataset_for_venue(instrument_ids[0].venue)
live_client = self._get_live_client_mbo(dataset)

Expand Down

0 comments on commit 9816f58

Please sign in to comment.