Skip to content

Commit

Permalink
Continue Databento integration
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Dec 14, 2023
1 parent 71b0b92 commit 44fc2da
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 22 deletions.
13 changes: 9 additions & 4 deletions examples/live/databento/databento_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import BookType
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.identifiers import TraderId
from nautilus_trader.trading.strategy import Strategy
Expand All @@ -39,14 +40,14 @@
# For correct subscription operation, you must specify all instruments to be immediately
# subscribed for as part of the data client configuration
instrument_ids = [
InstrumentId.from_str("AAPL.XCHI"),
InstrumentId.from_str("ESZ4.GLBX"),
# InstrumentId.from_str("AAPL.XCHI"),
InstrumentId.from_str("ESZ3.GLBX"),
]

# Configure the trading node
config_node = TradingNodeConfig(
trader_id=TraderId("TESTER-001"),
logging=LoggingConfig(log_level="DEBUG"), # For development
logging=LoggingConfig(log_level="INFO"),
exec_engine=LiveExecEngineConfig(
reconciliation=False, # Not applicable
inflight_check_interval_ms=0, # Not applicable
Expand Down Expand Up @@ -129,7 +130,11 @@ def on_start(self) -> None:
"""
for instrument_id in self.instrument_ids:
self.subscribe_order_book_deltas(instrument_id, client_id=DATABENTO_CLIENT_ID)
self.subscribe_order_book_deltas(
instrument_id=instrument_id,
book_type=BookType.L3_MBO,
client_id=DATABENTO_CLIENT_ID,
)
self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)

Expand Down
52 changes: 34 additions & 18 deletions nautilus_trader/adapters/databento/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from nautilus_trader.adapters.databento.constants import DATABENTO_CLIENT_ID
from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader
from nautilus_trader.adapters.databento.parsing import parse_record
from nautilus_trader.adapters.databento.types import Dataset
from nautilus_trader.adapters.databento.types import PublisherId
from nautilus_trader.cache.cache import Cache
from nautilus_trader.common.clock import LiveClock
from nautilus_trader.common.component import MessageBus
Expand Down Expand Up @@ -87,19 +89,20 @@ def __init__(
# Configuration
self._live_api_key: str = config.api_key or http_client.key
self._live_gateway: str | None = config.live_gateway
self._datasets: list[str] = config.datasets or []
self._instrument_ids: dict[str, set[InstrumentId]] = {}
self._datasets: list[Dataset] = config.datasets or []
self._instrument_ids: dict[Dataset, set[InstrumentId]] = {}
self._instrument_maps: dict[PublisherId, databento.InstrumentMap] = {}
self._initial_load_timeout: float | None = config.initial_load_timeout

# Clients
self._http_client: databento.Historical = http_client
self._live_clients: dict[str, databento.Live] = {}
self._has_subscribed: dict[str, bool] = {}
self._live_clients: dict[Dataset, databento.Live] = {}
self._has_subscribed: dict[Dataset, bool] = {}
self._loader = DatabentoDataLoader()

# Cache instrument index
for instrument_id in config.instrument_ids or []:
dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue)
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
if dataset not in self._instrument_ids:
self._instrument_ids[dataset] = set()
self._instrument_ids[dataset].add(instrument_id)
Expand Down Expand Up @@ -137,7 +140,7 @@ async def _disconnect(self) -> None:

await asyncio.gather(*tasks)

def _get_live_client(self, dataset: str) -> databento.Live:
def _get_live_client(self, dataset: Dataset) -> databento.Live:
client = self._live_clients.get(dataset)

if client is None:
Expand All @@ -147,15 +150,15 @@ def _get_live_client(self, dataset: str) -> databento.Live:

return client

def _check_live_client_started(self, dataset: str, live_client: databento.Live) -> None:
def _check_live_client_started(self, dataset: Dataset, live_client: databento.Live) -> None:
if not self._has_subscribed.get(dataset):
self._log.debug(f"Starting live client for {dataset}...", LogColor.MAGENTA)
self._log.debug(f"Starting {dataset} live client...", LogColor.MAGENTA)
live_client.start()
self._has_subscribed[dataset] = True
self._log.info(f"Started {dataset} live feed.", LogColor.BLUE)

async def _ensure_subscribed_for_instrument(self, instrument_id: InstrumentId) -> None:
dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue)
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
subscribed_instruments = self._instrument_ids.get(dataset)
if not subscribed_instruments:
subscribed_instruments = set()
Expand All @@ -167,7 +170,7 @@ async def _ensure_subscribed_for_instrument(self, instrument_id: InstrumentId) -
self._instrument_ids[dataset].add(instrument_id)
await self._subscribe_instrument(instrument_id)

def _load_instrument_ids(self, dataset: str, instrument_ids: list[InstrumentId]) -> None:
def _load_instrument_ids(self, dataset: Dataset, instrument_ids: list[InstrumentId]) -> None:
instrument_ids_to_decode = set(instrument_ids)

# Use fresh live data client for a one off initial instruments load
Expand Down Expand Up @@ -204,7 +207,7 @@ async def _subscribe_instruments(self) -> None:
raise NotImplementedError("Cannot subscribe to all instruments (not currently supported).")

async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None:
dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue)
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
live_client.subscribe(
dataset=dataset,
Expand All @@ -221,7 +224,7 @@ async def _subscribe_order_book_deltas(
kwargs: dict | None = None,
) -> None:
await self._ensure_subscribed_for_instrument(instrument_id)
dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue)
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
live_client.subscribe(
dataset=dataset,
Expand Down Expand Up @@ -250,7 +253,7 @@ async def _subscribe_order_book_snapshots(
)
return

dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue)
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
live_client.subscribe(
dataset=dataset,
Expand All @@ -267,7 +270,7 @@ async def _subscribe_ticker(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
await self._ensure_subscribed_for_instrument(instrument_id)

dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue)
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
live_client.subscribe(
dataset=dataset,
Expand All @@ -282,7 +285,7 @@ async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:

await self._ensure_subscribed_for_instrument(instrument_id)

dataset: str = self._loader.get_dataset_for_venue(instrument_id.venue)
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
live_client.subscribe(
dataset=dataset,
Expand Down Expand Up @@ -323,7 +326,7 @@ async def _subscribe_bars(self, bar_type: BarType) -> None:
)
return

dataset: str = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue)
dataset: Dataset = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue)
live_client = self._get_live_client(dataset)
live_client.subscribe(
dataset=dataset,
Expand Down Expand Up @@ -384,8 +387,21 @@ async def _unsubscribe_bars(self, bar_type: BarType) -> None:
)

def _handle_record(self, record: databento.DBNRecord) -> None:
self._log.info(f"Received {record}", LogColor.MAGENTA)
data = parse_record(record, self._loader.publishers())
try:
self._log.debug(f"Received {record}", LogColor.MAGENTA)
instrument_map = self._instrument_maps.get(0) # Hardcode publisher ID for now
if not instrument_map:
instrument_map = databento.InstrumentMap()
self._instrument_maps[0] = instrument_map

if isinstance(record, databento.SymbolMappingMsg):
instrument_map.insert_symbol_mapping_msg(record)
return

data = parse_record(record, self._loader.publishers(), instrument_map)
except ValueError as e:
self._log.error(f"{e!r}")
return

if isinstance(data, tuple):
self._handle_data(data[0])
Expand Down
4 changes: 4 additions & 0 deletions nautilus_trader/adapters/databento/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
import msgspec


Dataset = str
PublisherId = int


class DatabentoPublisher(msgspec.Struct, frozen=True):
"""
Represents a Databento publisher including dataset name and venue.
Expand Down

0 comments on commit 44fc2da

Please sign in to comment.