Skip to content

Commit

Permalink
Implement Databento parent symbols on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Feb 12, 2024
1 parent e78bb97 commit f11de01
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
7 changes: 6 additions & 1 deletion nautilus_trader/adapters/databento/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ async def _connect(self) -> None:
coros: list[Coroutine] = []
for dataset, instrument_ids in self._instrument_ids.items():
loading_ids: list[InstrumentId] = sorted(instrument_ids)
coros.append(self._instrument_provider.load_ids_async(instrument_ids=loading_ids))
filters = {"parent_symbols": list(self._parent_symbols.get(dataset, []))}
coro = self._instrument_provider.load_ids_async(
instrument_ids=loading_ids,
filters=filters,
)
coros.append(coro)
await self._subscribe_instrument_ids(dataset, instrument_ids=loading_ids)

try:
Expand Down
14 changes: 13 additions & 1 deletion nautilus_trader/adapters/databento/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from nautilus_trader.adapters.databento.enums import DatabentoSchema
from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader
from nautilus_trader.common.component import LiveClock
from nautilus_trader.common.enums import LogColor
from nautilus_trader.common.providers import InstrumentProvider
from nautilus_trader.config import InstrumentProviderConfig
from nautilus_trader.core import nautilus_pyo3
Expand Down Expand Up @@ -123,13 +124,15 @@ async def load_ids_async(
publishers_path=str(PUBLISHERS_PATH),
)

parent_symbols = list(filters.get("parent_symbols", [])) if filters is not None else None

pyo3_instruments = []
success_msg = "All instruments received and decoded."

def receive_instruments(pyo3_instrument: Any) -> None:
pyo3_instruments.append(pyo3_instrument)
instrument_ids_to_decode.discard(pyo3_instrument.id.value)
if not instrument_ids_to_decode:
if not parent_symbols and not instrument_ids_to_decode:
raise asyncio.CancelledError(success_msg)

await live_client.subscribe(
Expand All @@ -138,6 +141,15 @@ def receive_instruments(pyo3_instrument: Any) -> None:
start=0, # From start of current week (latest definitions)
)

if parent_symbols:
self._log.info(f"Requesting parent symbols {parent_symbols}.", LogColor.BLUE)
await live_client.subscribe(
schema=DatabentoSchema.DEFINITION.value,
stype_in="parent",
symbols=",".join(parent_symbols),
start=0, # From start of current week (latest definitions)
)

try:
await asyncio.wait_for(live_client.start(callback=receive_instruments), timeout=5.0)
# TODO: Improve this so that `live_client.start` isn't raising a `ValueError`
Expand Down

0 comments on commit f11de01

Please sign in to comment.