diff --git a/nautilus_core/adapters/src/databento/python/live.rs b/nautilus_core/adapters/src/databento/python/live.rs index 40b954bdc770..ce48dc424f27 100644 --- a/nautilus_core/adapters/src/databento/python/live.rs +++ b/nautilus_core/adapters/src/databento/python/live.rs @@ -21,13 +21,12 @@ use dbn::{PitSymbolMap, Record, SymbolIndex, VersionUpgradePolicy}; use indexmap::IndexMap; use log::{error, info}; use nautilus_core::{ - ffi::cvec::CVec, python::{to_pyruntime_err, to_pyvalue_err}, time::{get_atomic_clock_realtime, AtomicTime, UnixNanos}, }; use nautilus_model::{ data::{delta::OrderBookDelta, deltas::OrderBookDeltas, Data}, - ffi::data::deltas::orderbook_deltas_new, + ffi::data::deltas::OrderBookDeltas_API, identifiers::{instrument_id::InstrumentId, symbol::Symbol, venue::Venue}, python::data::data_to_pycapsule, }; @@ -237,14 +236,11 @@ impl DatabentoLiveClient { let buffer = buffered_deltas.entry(delta.instrument_id).or_default(); buffer.push(delta); + // TODO: Temporary for debugging deltas_count += 1; println!( - "Buffering delta: {} {} {:?} flags={}, buffer_len={}", - deltas_count, - delta.ts_event, - buffering_start, - msg.flags, - buffer.len() + "Buffering delta: {} {} {:?} flags={}", + deltas_count, delta.ts_event, buffering_start, msg.flags, ); // Check if last message in the packet @@ -266,11 +262,9 @@ impl DatabentoLiveClient { } // SAFETY: We can guarantee a deltas vec exists - let instrument_id = delta.instrument_id; let buffer = buffered_deltas.remove(&delta.instrument_id).unwrap(); let deltas = OrderBookDeltas::new(delta.instrument_id, buffer); - let deltas_cvec: CVec = deltas.deltas.into(); - let deltas = orderbook_deltas_new(instrument_id, &deltas_cvec); + let deltas = OrderBookDeltas_API::new(deltas); data1 = Some(Data::Deltas(deltas)); } }; diff --git a/nautilus_trader/adapters/databento/data.py b/nautilus_trader/adapters/databento/data.py index 5ef62248918a..bdd32426e01b 100644 --- a/nautilus_trader/adapters/databento/data.py +++ b/nautilus_trader/adapters/databento/data.py @@ -41,7 +41,6 @@ from nautilus_trader.model.data import Bar from nautilus_trader.model.data import BarType from nautilus_trader.model.data import DataType -from nautilus_trader.model.data import OrderBookDelta from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick from nautilus_trader.model.data import capsule_to_data @@ -137,8 +136,6 @@ def __init__( self._buffer_mbo_subscriptions_task: asyncio.Task | None = None self._is_buffering_mbo_subscriptions: bool = bool(config.mbo_subscriptions_delay) self._buffered_mbo_subscriptions: dict[Dataset, list[InstrumentId]] = defaultdict(list) - self._buffered_deltas: dict[InstrumentId, list[OrderBookDelta]] = defaultdict(list) - self._buffering_replay: dict[InstrumentId, int] = {} # Tasks self._live_client_futures: set[asyncio.Future] = set() @@ -492,11 +489,6 @@ async def _subscribe_order_book_deltas_batch( ids_str = ",".join([i.value for i in instrument_ids]) self._log.info(f"Subscribing to MBO/L3 for {ids_str}.", LogColor.BLUE) - # Setup buffered start times - now = self._clock.utc_now() - for instrument_id in instrument_ids: - self._buffering_replay[instrument_id] = now.value - dataset: Dataset = self._loader.get_dataset_for_venue(instrument_ids[0].venue) live_client = self._get_live_client_mbo(dataset)