Skip to content

Commit

Permalink
Implement Databento MBO message F_LAST buffering
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Dec 29, 2023
1 parent b68ae9d commit 4c4b4a7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
16 changes: 15 additions & 1 deletion nautilus_trader/adapters/databento/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
from nautilus_trader.model.data import Bar
from nautilus_trader.model.data import BarType
from nautilus_trader.model.data import DataType
from nautilus_trader.model.data import OrderBookDelta
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import BookType
Expand Down Expand Up @@ -133,9 +135,10 @@ def __init__(
self._instrument_ids[dataset].add(instrument_id)

# MBO/L3 subscription buffering
self._buffer_mbo_subscriptions_task: asyncio.Task | None = None
self._is_buffering_mbo_subscriptions: bool = bool(config.mbo_subscriptions_delay)
self._buffered_mbo_subscriptions: dict[Dataset, list[InstrumentId]] = defaultdict(list)
self._buffer_mbo_subscriptions_task: asyncio.Task | None = None
self._buffered_deltas: dict[InstrumentId, list[OrderBookDelta]] = defaultdict(list)

async def _connect(self) -> None:
if not self._instrument_ids:
Expand Down Expand Up @@ -797,6 +800,17 @@ def _handle_record(
self._log.error(f"{e!r}")
return

if isinstance(data, OrderBookDelta):
if databento.RecordFlags.F_LAST not in databento.RecordFlags(data.flags):
buffer = self._buffered_deltas[data.instrument_id]
buffer.append(data)
return # We can rely on the F_LAST flag for an MBO feed
else:
buffer = self._buffered_deltas[data.instrument_id]
buffer.append(data)
data = OrderBookDeltas(instrument_id, deltas=buffer.copy())
buffer.clear()

if isinstance(data, tuple):
self._handle_data(data[0])
self._handle_data(data[1])
Expand Down
15 changes: 7 additions & 8 deletions nautilus_trader/adapters/databento/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,38 +233,37 @@ def parse_mbo_msg(
record: databento.MBOMsg,
instrument_id: InstrumentId,
ts_init: int,
) -> OrderBookDelta:
if record.action == "T":
) -> OrderBookDelta | TradeTick:
side: OrderSide = parse_order_side(record.side)
if side == OrderSide.NO_ORDER_SIDE:
return TradeTick.from_raw(
instrument_id=instrument_id,
price_raw=record.price,
price_prec=USD.precision, # TODO(per instrument precision)
size_raw=int(record.size * FIXED_SCALAR), # No fractional sizes
size_prec=0, # No fractional units
aggressor_side=parse_aggressor_side(record.side),
aggressor_side=AggressorSide.NO_AGGRESSOR,
trade_id=TradeId(str(record.sequence)),
ts_event=record.ts_recv, # More accurate and reliable timestamp
ts_init=ts_init,
)

action: BookAction = parse_book_action(record.action)
side: OrderSide = parse_order_side(record.side)
if side == OrderSide.NO_ORDER_SIDE:
if record.action == "T":
return TradeTick.from_raw(
instrument_id=instrument_id,
price_raw=record.price,
price_prec=USD.precision, # TODO(per instrument precision)
size_raw=int(record.size * FIXED_SCALAR), # No fractional sizes
size_prec=0, # No fractional units
aggressor_side=AggressorSide.NO_AGGRESSOR,
aggressor_side=parse_aggressor_side(record.side),
trade_id=TradeId(str(record.sequence)),
ts_event=record.ts_recv, # More accurate and reliable timestamp
ts_init=ts_init,
)

return OrderBookDelta.from_raw(
instrument_id=instrument_id,
action=action,
action=parse_book_action(record.action),
side=side,
price_raw=record.price,
price_prec=USD.precision, # TODO(per instrument precision)
Expand Down

0 comments on commit 4c4b4a7

Please sign in to comment.