Skip to content

Commit

Permalink
Wire up OrderBookDepth10 to DataEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 1, 2024
1 parent e461470 commit 52c8616
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 4 deletions.
2 changes: 2 additions & 0 deletions nautilus_trader/data/engine.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ from nautilus_trader.model.data cimport InstrumentClose
from nautilus_trader.model.data cimport InstrumentStatus
from nautilus_trader.model.data cimport OrderBookDelta
from nautilus_trader.model.data cimport OrderBookDeltas
from nautilus_trader.model.data cimport OrderBookDepth10
from nautilus_trader.model.data cimport QuoteTick
from nautilus_trader.model.data cimport Ticker
from nautilus_trader.model.data cimport TradeTick
Expand Down Expand Up @@ -146,6 +147,7 @@ cdef class DataEngine(Component):
cpdef void _handle_instrument(self, Instrument instrument)
cpdef void _handle_order_book_delta(self, OrderBookDelta delta)
cpdef void _handle_order_book_deltas(self, OrderBookDeltas deltas)
cpdef void _handle_order_book_depth(self, OrderBookDepth10 depth)
cpdef void _handle_ticker(self, Ticker ticker)
cpdef void _handle_quote_tick(self, QuoteTick tick)
cpdef void _handle_trade_tick(self, TradeTick tick)
Expand Down
23 changes: 23 additions & 0 deletions nautilus_trader/data/engine.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ from nautilus_trader.model.data cimport InstrumentClose
from nautilus_trader.model.data cimport InstrumentStatus
from nautilus_trader.model.data cimport OrderBookDelta
from nautilus_trader.model.data cimport OrderBookDeltas
from nautilus_trader.model.data cimport OrderBookDepth10
from nautilus_trader.model.data cimport QuoteTick
from nautilus_trader.model.data cimport TradeTick
from nautilus_trader.model.data cimport VenueStatus
Expand Down Expand Up @@ -894,6 +895,18 @@ cdef class DataEngine(Component):
priority=10,
)

topic = f"data.book.depth.{instrument_id.venue}.{instrument_id.symbol}"

if not only_deltas and not self._msgbus.is_subscribed(
topic=topic,
handler=self._update_order_book,
):
self._msgbus.subscribe(
topic=topic,
handler=self._update_order_book,
priority=10,
)

cpdef void _handle_subscribe_ticker(
self,
MarketDataClient client,
Expand Down Expand Up @@ -1390,6 +1403,8 @@ cdef class DataEngine(Component):
self._handle_order_book_delta(data)
elif isinstance(data, OrderBookDeltas):
self._handle_order_book_deltas(data)
elif isinstance(data, OrderBookDepth10):
self._handle_order_book_depth(data)
elif isinstance(data, Ticker):
self._handle_ticker(data)
elif isinstance(data, QuoteTick):
Expand Down Expand Up @@ -1440,6 +1455,14 @@ cdef class DataEngine(Component):
msg=deltas,
)

cpdef void _handle_order_book_depth(self, OrderBookDepth10 depth):
self._msgbus.publish_c(
topic=f"data.book.depth"
f".{depth.instrument_id.venue}"
f".{depth.instrument_id.symbol}",
msg=depth,
)

cpdef void _handle_ticker(self, Ticker ticker):
self._cache.add_ticker(ticker)
self._msgbus.publish_c(
Expand Down
2 changes: 2 additions & 0 deletions nautilus_trader/model/book.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ from nautilus_trader.core.rust.model cimport OrderSide
from nautilus_trader.model.data cimport BookOrder
from nautilus_trader.model.data cimport OrderBookDelta
from nautilus_trader.model.data cimport OrderBookDeltas
from nautilus_trader.model.data cimport OrderBookDepth10
from nautilus_trader.model.data cimport QuoteTick
from nautilus_trader.model.data cimport TradeTick
from nautilus_trader.model.objects cimport Price
Expand All @@ -42,6 +43,7 @@ cdef class OrderBook(Data):
cpdef void clear_asks(self, uint64_t ts_event, uint64_t sequence=*)
cpdef void apply_delta(self, OrderBookDelta delta)
cpdef void apply_deltas(self, OrderBookDeltas deltas)
cpdef void apply_depth(self, OrderBookDepth10 depth)
cpdef void apply(self, Data data)
cpdef void check_integrity(self)

Expand Down
28 changes: 24 additions & 4 deletions nautilus_trader/model/book.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ from nautilus_trader.core.rust.model cimport level_price
from nautilus_trader.core.rust.model cimport level_size
from nautilus_trader.core.rust.model cimport orderbook_add
from nautilus_trader.core.rust.model cimport orderbook_apply_delta
from nautilus_trader.core.rust.model cimport orderbook_apply_depth
from nautilus_trader.core.rust.model cimport orderbook_asks
from nautilus_trader.core.rust.model cimport orderbook_best_ask_price
from nautilus_trader.core.rust.model cimport orderbook_best_ask_size
Expand Down Expand Up @@ -79,6 +80,9 @@ from nautilus_trader.core.rust.model cimport vec_levels_drop
from nautilus_trader.core.rust.model cimport vec_orders_drop
from nautilus_trader.core.string cimport cstr_to_pystr
from nautilus_trader.model.data cimport BookOrder
from nautilus_trader.model.data cimport OrderBookDelta
from nautilus_trader.model.data cimport OrderBookDeltas
from nautilus_trader.model.data cimport OrderBookDepth10
from nautilus_trader.model.data cimport TradeTick
from nautilus_trader.model.functions cimport book_type_to_str
from nautilus_trader.model.functions cimport order_side_to_str
Expand Down Expand Up @@ -328,6 +332,20 @@ cdef class OrderBook(Data):
for delta in deltas.deltas:
self.apply_delta(delta)

cpdef void apply_depth(self, OrderBookDepth10 depth):
"""
Apply the depth update to the order book.
Parameters
----------
depth : OrderBookDepth10
The depth update to apply.
"""
Condition.not_none(depth, "depth")

orderbook_apply_depth(&self._mem, depth._mem)

cpdef void apply(self, Data data):
"""
Apply the given data to the order book.
Expand All @@ -338,10 +356,12 @@ cdef class OrderBook(Data):
The data to apply.
"""
if isinstance(data, OrderBookDeltas):
self.apply_deltas(deltas=data)
elif isinstance(data, OrderBookDelta):
self.apply_delta(delta=data)
if isinstance(data, OrderBookDelta):
self.apply_delta(data)
elif isinstance(data, OrderBookDeltas):
self.apply_deltas(data)
elif isinstance(data, OrderBookDepth10):
self.apply_depth(data)
else: # pragma: no-cover (design time error)
raise RuntimeError(f"invalid order book data type, was {type(data)}") # pragma: no-cover (design time error)

Expand Down
76 changes: 76 additions & 0 deletions tests/unit_tests/data/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@

BITMEX = Venue("BITMEX")
BINANCE = Venue("BINANCE")
XNAS = Venue("XNAS")
AAPL_XNAS = TestInstrumentProvider.equity()
XBTUSD_BITMEX = TestInstrumentProvider.xbtusd_bitmex()
BTCUSDT_BINANCE = TestInstrumentProvider.btcusdt_binance()
ETHUSDT_BINANCE = TestInstrumentProvider.ethusdt_binance()
Expand Down Expand Up @@ -1147,6 +1149,80 @@ def test_process_order_book_snapshots_when_multiple_subscribers_then_sends_to_re
assert handler1[0] == cached_book
assert handler2[0] == cached_book

def test_process_order_book_depth_when_multiple_subscribers_then_sends_to_registered_handlers(
self,
):
# Arrange
self.data_engine.register_client(self.binance_client)
self.binance_client.start()

self.data_engine.process(AAPL_XNAS) # <-- add necessary instrument for test

handler1 = []
handler2 = []
self.msgbus.subscribe(
topic="data.book.depth.XNAS.AAPL",
handler=handler1.append,
)
self.msgbus.subscribe(
topic="data.book.depth.XNAS.AAPL",
handler=handler2.append,
)

subscribe1 = Subscribe(
client_id=ClientId("DATABENTO"),
venue=BINANCE,
data_type=DataType(
OrderBook,
{
"instrument_id": AAPL_XNAS.id,
"book_type": BookType.L2_MBP,
"depth": 10,
"interval_ms": 1000,
},
),
command_id=UUID4(),
ts_init=self.clock.timestamp_ns(),
)

subscribe2 = Subscribe(
client_id=ClientId(BINANCE.value),
venue=BINANCE,
data_type=DataType(
OrderBook,
{
"instrument_id": AAPL_XNAS.id,
"book_type": BookType.L2_MBP,
"depth": 10,
"interval_ms": 1000,
},
),
command_id=UUID4(),
ts_init=self.clock.timestamp_ns(),
)

self.data_engine.execute(subscribe1)
self.data_engine.execute(subscribe2)

depth = TestDataStubs.order_book_depth10(
instrument_id=AAPL_XNAS.id,
ts_event=1,
)

self.data_engine.process(depth)
events = self.clock.advance_time(2_000_000_000)
events[0].handle()

# Act
self.data_engine.process(depth)

# Assert
cached_book = self.cache.order_book(AAPL_XNAS.id)
assert isinstance(cached_book, OrderBook)
assert cached_book.instrument_id == AAPL_XNAS.id
assert handler1[0] == depth
assert handler2[0] == depth

def test_order_book_delta_creates_book(self):
# Arrange
self.data_engine.register_client(self.betfair)
Expand Down

0 comments on commit 52c8616

Please sign in to comment.