From 44fc2da64ff1b7d2b9a18741e21ebe7e07bcdbc8 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Thu, 14 Dec 2023 19:56:53 +1100 Subject: [PATCH] Continue Databento integration --- .../live/databento/databento_subscriber.py | 13 +++-- nautilus_trader/adapters/databento/data.py | 52 ++++++++++++------- nautilus_trader/adapters/databento/types.py | 4 ++ 3 files changed, 47 insertions(+), 22 deletions(-) diff --git a/examples/live/databento/databento_subscriber.py b/examples/live/databento/databento_subscriber.py index 0c0b963a643d..fdff89aa3a07 100644 --- a/examples/live/databento/databento_subscriber.py +++ b/examples/live/databento/databento_subscriber.py @@ -28,6 +28,7 @@ from nautilus_trader.model.data import OrderBookDeltas from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick +from nautilus_trader.model.enums import BookType from nautilus_trader.model.identifiers import InstrumentId from nautilus_trader.model.identifiers import TraderId from nautilus_trader.trading.strategy import Strategy @@ -39,14 +40,14 @@ # For correct subscription operation, you must specify all instruments to be immediately # subscribed for as part of the data client configuration instrument_ids = [ - InstrumentId.from_str("AAPL.XCHI"), - InstrumentId.from_str("ESZ4.GLBX"), + # InstrumentId.from_str("AAPL.XCHI"), + InstrumentId.from_str("ESZ3.GLBX"), ] # Configure the trading node config_node = TradingNodeConfig( trader_id=TraderId("TESTER-001"), - logging=LoggingConfig(log_level="DEBUG"), # For development + logging=LoggingConfig(log_level="INFO"), exec_engine=LiveExecEngineConfig( reconciliation=False, # Not applicable inflight_check_interval_ms=0, # Not applicable @@ -129,7 +130,11 @@ def on_start(self) -> None: """ for instrument_id in self.instrument_ids: - self.subscribe_order_book_deltas(instrument_id, client_id=DATABENTO_CLIENT_ID) + self.subscribe_order_book_deltas( + instrument_id=instrument_id, + book_type=BookType.L3_MBO, + client_id=DATABENTO_CLIENT_ID, + ) self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) diff --git a/nautilus_trader/adapters/databento/data.py b/nautilus_trader/adapters/databento/data.py index eb3fdad4be5f..91548f2bcf22 100644 --- a/nautilus_trader/adapters/databento/data.py +++ b/nautilus_trader/adapters/databento/data.py @@ -23,6 +23,8 @@ from nautilus_trader.adapters.databento.constants import DATABENTO_CLIENT_ID from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader from nautilus_trader.adapters.databento.parsing import parse_record +from nautilus_trader.adapters.databento.types import Dataset +from nautilus_trader.adapters.databento.types import PublisherId from nautilus_trader.cache.cache import Cache from nautilus_trader.common.clock import LiveClock from nautilus_trader.common.component import MessageBus @@ -87,19 +89,20 @@ def __init__( # Configuration self._live_api_key: str = config.api_key or http_client.key self._live_gateway: str | None = config.live_gateway - self._datasets: list[str] = config.datasets or [] - self._instrument_ids: dict[str, set[InstrumentId]] = {} + self._datasets: list[Dataset] = config.datasets or [] + self._instrument_ids: dict[Dataset, set[InstrumentId]] = {} + self._instrument_maps: dict[PublisherId, databento.InstrumentMap] = {} self._initial_load_timeout: float | None = config.initial_load_timeout # Clients self._http_client: databento.Historical = http_client - self._live_clients: dict[str, databento.Live] = {} - self._has_subscribed: dict[str, bool] = {} + self._live_clients: dict[Dataset, databento.Live] = {} + self._has_subscribed: dict[Dataset, bool] = {} self._loader = DatabentoDataLoader() # Cache instrument index for instrument_id in config.instrument_ids or []: - dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue) + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) if dataset not in self._instrument_ids: self._instrument_ids[dataset] = set() self._instrument_ids[dataset].add(instrument_id) @@ -137,7 +140,7 @@ async def _disconnect(self) -> None: await asyncio.gather(*tasks) - def _get_live_client(self, dataset: str) -> databento.Live: + def _get_live_client(self, dataset: Dataset) -> databento.Live: client = self._live_clients.get(dataset) if client is None: @@ -147,15 +150,15 @@ def _get_live_client(self, dataset: str) -> databento.Live: return client - def _check_live_client_started(self, dataset: str, live_client: databento.Live) -> None: + def _check_live_client_started(self, dataset: Dataset, live_client: databento.Live) -> None: if not self._has_subscribed.get(dataset): - self._log.debug(f"Starting live client for {dataset}...", LogColor.MAGENTA) + self._log.debug(f"Starting {dataset} live client...", LogColor.MAGENTA) live_client.start() self._has_subscribed[dataset] = True self._log.info(f"Started {dataset} live feed.", LogColor.BLUE) async def _ensure_subscribed_for_instrument(self, instrument_id: InstrumentId) -> None: - dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue) + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) subscribed_instruments = self._instrument_ids.get(dataset) if not subscribed_instruments: subscribed_instruments = set() @@ -167,7 +170,7 @@ async def _ensure_subscribed_for_instrument(self, instrument_id: InstrumentId) - self._instrument_ids[dataset].add(instrument_id) await self._subscribe_instrument(instrument_id) - def _load_instrument_ids(self, dataset: str, instrument_ids: list[InstrumentId]) -> None: + def _load_instrument_ids(self, dataset: Dataset, instrument_ids: list[InstrumentId]) -> None: instrument_ids_to_decode = set(instrument_ids) # Use fresh live data client for a one off initial instruments load @@ -204,7 +207,7 @@ async def _subscribe_instruments(self) -> None: raise NotImplementedError("Cannot subscribe to all instruments (not currently supported).") async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None: - dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue) + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) live_client = self._get_live_client(dataset) live_client.subscribe( dataset=dataset, @@ -221,7 +224,7 @@ async def _subscribe_order_book_deltas( kwargs: dict | None = None, ) -> None: await self._ensure_subscribed_for_instrument(instrument_id) - dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue) + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) live_client = self._get_live_client(dataset) live_client.subscribe( dataset=dataset, @@ -250,7 +253,7 @@ async def _subscribe_order_book_snapshots( ) return - dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue) + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) live_client = self._get_live_client(dataset) live_client.subscribe( dataset=dataset, @@ -267,7 +270,7 @@ async def _subscribe_ticker(self, instrument_id: InstrumentId) -> None: async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: await self._ensure_subscribed_for_instrument(instrument_id) - dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue) + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) live_client = self._get_live_client(dataset) live_client.subscribe( dataset=dataset, @@ -282,7 +285,7 @@ async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: await self._ensure_subscribed_for_instrument(instrument_id) - dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue) + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) live_client = self._get_live_client(dataset) live_client.subscribe( dataset=dataset, @@ -323,7 +326,7 @@ async def _subscribe_bars(self, bar_type: BarType) -> None: ) return - dataset: str = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue) + dataset: Dataset = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue) live_client = self._get_live_client(dataset) live_client.subscribe( dataset=dataset, @@ -384,8 +387,21 @@ async def _unsubscribe_bars(self, bar_type: BarType) -> None: ) def _handle_record(self, record: databento.DBNRecord) -> None: - self._log.info(f"Received {record}", LogColor.MAGENTA) - data = parse_record(record, self._loader.publishers()) + try: + self._log.debug(f"Received {record}", LogColor.MAGENTA) + instrument_map = self._instrument_maps.get(0) # Hardcode publisher ID for now + if not instrument_map: + instrument_map = databento.InstrumentMap() + self._instrument_maps[0] = instrument_map + + if isinstance(record, databento.SymbolMappingMsg): + instrument_map.insert_symbol_mapping_msg(record) + return + + data = parse_record(record, self._loader.publishers(), instrument_map) + except ValueError as e: + self._log.error(f"{e!r}") + return if isinstance(data, tuple): self._handle_data(data[0]) diff --git a/nautilus_trader/adapters/databento/types.py b/nautilus_trader/adapters/databento/types.py index a098f50a1f3d..eef786036262 100644 --- a/nautilus_trader/adapters/databento/types.py +++ b/nautilus_trader/adapters/databento/types.py @@ -16,6 +16,10 @@ import msgspec +Dataset = str +PublisherId = int + + class DatabentoPublisher(msgspec.Struct, frozen=True): """ Represents a Databento publisher including dataset name and venue.