From fdd461637ec97cf40e2d8fcd6491c7f6525cfbe0 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Sat, 16 Dec 2023 23:11:08 +1100 Subject: [PATCH] Continue Databento integration --- .../live/databento/databento_subscriber.py | 4 + .../adapters/databento/constants.py | 2 + nautilus_trader/adapters/databento/data.py | 227 ++++++++++++++++-- nautilus_trader/adapters/databento/parsing.py | 2 +- 4 files changed, 214 insertions(+), 21 deletions(-) diff --git a/examples/live/databento/databento_subscriber.py b/examples/live/databento/databento_subscriber.py index fdff89aa3a07..24a9d9175138 100644 --- a/examples/live/databento/databento_subscriber.py +++ b/examples/live/databento/databento_subscriber.py @@ -137,6 +137,10 @@ def on_start(self) -> None: ) self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) + # self.request_quote_ticks(instrument_id) + # self.request_trade_ticks(instrument_id) + # self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL")) + # self.request_instruments(instrument_id.venue) def on_stop(self) -> None: """ diff --git a/nautilus_trader/adapters/databento/constants.py b/nautilus_trader/adapters/databento/constants.py index 9a84b5b34564..f60d96563c2a 100644 --- a/nautilus_trader/adapters/databento/constants.py +++ b/nautilus_trader/adapters/databento/constants.py @@ -20,3 +20,5 @@ DATABENTO: Final[str] = "DATABENTO" DATABENTO_CLIENT_ID: Final[ClientId] = ClientId(DATABENTO) + +ALL_SYMBOLS = "ALL_SYMBOLS" diff --git a/nautilus_trader/adapters/databento/data.py b/nautilus_trader/adapters/databento/data.py index 5aeb81c1d48f..1c62eb092ac4 100644 --- a/nautilus_trader/adapters/databento/data.py +++ b/nautilus_trader/adapters/databento/data.py @@ -21,6 +21,7 @@ import pandas as pd from nautilus_trader.adapters.databento.config import DatabentoDataClientConfig +from nautilus_trader.adapters.databento.constants import ALL_SYMBOLS from nautilus_trader.adapters.databento.constants import DATABENTO_CLIENT_ID from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader from nautilus_trader.adapters.databento.parsing import parse_record @@ -35,12 +36,17 @@ from nautilus_trader.core.nautilus_pyo3 import last_weekday_nanos from nautilus_trader.core.uuid import UUID4 from nautilus_trader.live.data_client import LiveMarketDataClient +from nautilus_trader.model.data import Bar from nautilus_trader.model.data import BarType from nautilus_trader.model.data import DataType +from nautilus_trader.model.data import QuoteTick +from nautilus_trader.model.data import TradeTick from nautilus_trader.model.enums import BarAggregation from nautilus_trader.model.enums import BookType +from nautilus_trader.model.enums import PriceType from nautilus_trader.model.identifiers import InstrumentId from nautilus_trader.model.identifiers import Venue +from nautilus_trader.model.instruments import Instrument class DatabentoDataClient(LiveMarketDataClient): @@ -323,6 +329,12 @@ async def _subscribe_bars(self, bar_type: BarType) -> None: ) return + if bar_type.spec.price_type != PriceType.LAST: + self._log.error( + f"Cannot subscribe to {bar_type}: only `LAST` price bars are aggregated by Databento.", + ) + return + if bar_type.spec.step != 1: self._log.error( f"Cannot subscribe to {bar_type}: only a step of 1 is supported.", @@ -357,7 +369,7 @@ async def _subscribe_bars(self, bar_type: BarType) -> None: async def _unsubscribe(self, data_type: DataType) -> None: raise NotImplementedError( - f"Cannot unsubscribe from {data_type} (not supported by Databento).", + f"Cannot unsubscribe from {data_type} (not implemented).", ) async def _unsubscribe_instruments(self) -> None: @@ -408,26 +420,82 @@ async def _unsubscribe_bars(self, bar_type: BarType) -> None: async def _request(self, data_type: DataType, correlation_id: UUID4) -> None: raise NotImplementedError( - "method `_request` must be implemented in the subclass", - ) # pragma: no cover + f"Cannot request {data_type} (not implemented).", + ) async def _request_instrument( self, instrument_id: InstrumentId, correlation_id: UUID4, ) -> None: - raise NotImplementedError( - "method `_request_instrument` must be implemented in the subclass", - ) # pragma: no cover + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) + date_now_utc = self._clock.utc_now().date() + start = last_weekday_nanos( # Use midnight (UTC) of last weekday + year=date_now_utc.year, + month=date_now_utc.month, + day=date_now_utc.day, + ) + + data = await self._http_client.timeseries.get_range_async( + dataset=dataset, + start=start, + symbols=instrument_id.symbol.value, + schema=databento.Schema.DEFINITION, + ) + + for record in data: + instrument = parse_record( + record=record, + publishers=self._loader.publishers(), + ) + + self._handle_instrument( + instrument=instrument, + correlation_id=correlation_id, + ) async def _request_instruments( self, venue: Venue, correlation_id: UUID4, ) -> None: - raise NotImplementedError( - "method `_request_instruments` must be implemented in the subclass", - ) # pragma: no cover + dataset: Dataset = self._loader.get_dataset_for_venue(venue) + date_now_utc = self._clock.utc_now().date() + start = ( + last_weekday_nanos( # Use midnight (UTC) of last weekday + year=date_now_utc.year, + month=date_now_utc.month, + day=date_now_utc.day, + ) + - pd.Timedelta(days=1).value + ) + + data = await self._http_client.timeseries.get_range_async( + dataset=dataset, + start=pd.Timestamp(start).date().isoformat(), + symbols=ALL_SYMBOLS, + schema=databento.Schema.DEFINITION, + ) + + instruments: list[Instrument] = [] + + for record in data: + try: + instrument = parse_record( + record=record, + publishers=self._loader.publishers(), + ) + except ValueError as ex: + self._log.error(repr(ex)) + continue + + instruments.append(instrument) + + self._handle_instruments( + instruments=instruments, + venue=venue, + correlation_id=correlation_id, + ) async def _request_quote_ticks( self, @@ -437,9 +505,39 @@ async def _request_quote_ticks( start: pd.Timestamp | None = None, end: pd.Timestamp | None = None, ) -> None: - raise NotImplementedError( - "method `_request_quote_tick` must be implemented in the subclass", - ) # pragma: no cover + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) + data = await self._http_client.timeseries.get_range_async( + dataset=dataset, + start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(), + end=end, + symbols=instrument_id.symbol.value, + schema=databento.Schema.MBP_1, + limit=limit, + ) + + instrument_map = databento.InstrumentMap() + instrument_map.insert_metadata(metadata=data.metadata) + + ticks: list[QuoteTick] = [] + + for record in data: + tick = parse_record( + record=record, + publishers=self._loader.publishers(), + instrument_map=instrument_map, + ) + + if not isinstance(tick, QuoteTick): + # Might be `TradeTick` + continue + + ticks.append(tick) + + self._handle_quote_ticks( + instrument_id=instrument_id, + ticks=ticks, + correlation_id=correlation_id, + ) async def _request_trade_ticks( self, @@ -449,9 +547,35 @@ async def _request_trade_ticks( start: pd.Timestamp | None = None, end: pd.Timestamp | None = None, ) -> None: - raise NotImplementedError( - "method `_request_trade_ticks` must be implemented in the subclass", - ) # pragma: no cover + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) + data = await self._http_client.timeseries.get_range_async( + dataset=dataset, + start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(), + end=end, + symbols=instrument_id.symbol.value, + schema=databento.Schema.TRADES, + limit=limit, + ) + + instrument_map = databento.InstrumentMap() + instrument_map.insert_metadata(metadata=data.metadata) + + ticks: list[TradeTick] = [] + + for record in data: + tick = parse_record( + record=record, + publishers=self._loader.publishers(), + instrument_map=instrument_map, + ) + + ticks.append(tick) + + self._handle_trade_ticks( + instrument_id=instrument_id, + ticks=ticks, + correlation_id=correlation_id, + ) async def _request_bars( self, @@ -461,9 +585,72 @@ async def _request_bars( start: pd.Timestamp | None = None, end: pd.Timestamp | None = None, ) -> None: - raise NotImplementedError( - "method `_request_bars` must be implemented in the subclass", - ) # pragma: no cover + PyCondition.true(bar_type.is_externally_aggregated(), "aggregation_source is not EXTERNAL") + + if not bar_type.spec.is_time_aggregated(): + self._log.error( + f"Cannot subscribe to {bar_type}: only time bars are aggregated by Databento.", + ) + return + + if bar_type.spec.price_type != PriceType.LAST: + self._log.error( + f"Cannot subscribe to {bar_type}: only `LAST` price bars are aggregated by Databento.", + ) + return + + if bar_type.spec.step != 1: + self._log.error( + f"Cannot subscribe to {bar_type}: only a step of 1 is supported.", + ) + + dataset: Dataset = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue) + + match bar_type.spec.aggregation: + case BarAggregation.SECOND: + schema = databento.Schema.OHLCV_1S + case BarAggregation.MINUTE: + schema = databento.Schema.OHLCV_1M + case BarAggregation.HOUR: + schema = databento.Schema.OHLCV_1H + case BarAggregation.DAY: + schema = databento.Schema.OHLCV_1D + case _: + self._log.error( + f"Cannot subscribe to {bar_type}: " + "use either 'SECOND', 'MINTUE', 'HOUR' or 'DAY' aggregations.", + ) + return + + data = await self._http_client.timeseries.get_range_async( + dataset=dataset, + start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(), + end=end, + symbols=bar_type.instrument_id.symbol.value, + schema=schema, + limit=limit, + ) + + instrument_map = databento.InstrumentMap() + instrument_map.insert_metadata(metadata=data.metadata) + + bars: list[Bar] = [] + + for record in data: + bar = parse_record( + record=record, + publishers=self._loader.publishers(), + instrument_map=instrument_map, + ) + + bars.append(bar) + + self._handle_bars( + bar_type=bar_type, + bars=bars, + partial=None, # No partials + correlation_id=correlation_id, + ) def _handle_record(self, record: databento.DBNRecord) -> None: try: @@ -475,10 +662,10 @@ def _handle_record(self, record: databento.DBNRecord) -> None: self._log.info(f"SystemMsg: {record.msg}") return - instrument_map = self._instrument_maps.get(0) # Hardcode publisher ID for now + instrument_map = self._instrument_maps.get(record.publisher_id) if not instrument_map: instrument_map = databento.InstrumentMap() - self._instrument_maps[0] = instrument_map + self._instrument_maps[record.publisher_id] = instrument_map if isinstance(record, databento.SymbolMappingMsg): instrument_map.insert_symbol_mapping_msg(record) diff --git a/nautilus_trader/adapters/databento/parsing.py b/nautilus_trader/adapters/databento/parsing.py index 1cfa15b7f34b..1abd9cc63c27 100644 --- a/nautilus_trader/adapters/databento/parsing.py +++ b/nautilus_trader/adapters/databento/parsing.py @@ -409,7 +409,7 @@ def parse_instrument_def( case DatabentoInstrumentClass.FX_SPOT.value: raise ValueError("`instrument_class` FX_SPOT not currently supported") case DatabentoInstrumentClass.OPTION_SPREAD.value: - raise ValueError("`instrument_class` OPTION_SPREAD not currently supported") + return parse_options_contract(record, instrument_id) case DatabentoInstrumentClass.MIXED_SPREAD.value: raise ValueError("`instrument_class` MIXED_SPREAD not currently supported") case _: