diff --git a/nautilus_trader/adapters/databento/data.py b/nautilus_trader/adapters/databento/data.py index 06efffc362c6..470b66eeb0a2 100644 --- a/nautilus_trader/adapters/databento/data.py +++ b/nautilus_trader/adapters/databento/data.py @@ -159,7 +159,12 @@ async def _connect(self) -> None: coros: list[Coroutine] = [] for dataset, instrument_ids in self._instrument_ids.items(): loading_ids: list[InstrumentId] = sorted(instrument_ids) - coros.append(self._instrument_provider.load_ids_async(instrument_ids=loading_ids)) + filters = {"parent_symbols": list(self._parent_symbols.get(dataset, []))} + coro = self._instrument_provider.load_ids_async( + instrument_ids=loading_ids, + filters=filters, + ) + coros.append(coro) await self._subscribe_instrument_ids(dataset, instrument_ids=loading_ids) try: diff --git a/nautilus_trader/adapters/databento/providers.py b/nautilus_trader/adapters/databento/providers.py index 5ff46034dc2c..7b7198c48c41 100644 --- a/nautilus_trader/adapters/databento/providers.py +++ b/nautilus_trader/adapters/databento/providers.py @@ -25,6 +25,7 @@ from nautilus_trader.adapters.databento.enums import DatabentoSchema from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader from nautilus_trader.common.component import LiveClock +from nautilus_trader.common.enums import LogColor from nautilus_trader.common.providers import InstrumentProvider from nautilus_trader.config import InstrumentProviderConfig from nautilus_trader.core import nautilus_pyo3 @@ -123,13 +124,15 @@ async def load_ids_async( publishers_path=str(PUBLISHERS_PATH), ) + parent_symbols = list(filters.get("parent_symbols", [])) if filters is not None else None + pyo3_instruments = [] success_msg = "All instruments received and decoded." def receive_instruments(pyo3_instrument: Any) -> None: pyo3_instruments.append(pyo3_instrument) instrument_ids_to_decode.discard(pyo3_instrument.id.value) - if not instrument_ids_to_decode: + if not parent_symbols and not instrument_ids_to_decode: raise asyncio.CancelledError(success_msg) await live_client.subscribe( @@ -138,6 +141,15 @@ def receive_instruments(pyo3_instrument: Any) -> None: start=0, # From start of current week (latest definitions) ) + if parent_symbols: + self._log.info(f"Requesting parent symbols {parent_symbols}.", LogColor.BLUE) + await live_client.subscribe( + schema=DatabentoSchema.DEFINITION.value, + stype_in="parent", + symbols=",".join(parent_symbols), + start=0, # From start of current week (latest definitions) + ) + try: await asyncio.wait_for(live_client.start(callback=receive_instruments), timeout=5.0) # TODO: Improve this so that `live_client.start` isn't raising a `ValueError`