Skip to content

Commit

Permalink
Continue Databento integration
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Dec 16, 2023
1 parent 4dcdd73 commit fdd4616
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 21 deletions.
4 changes: 4 additions & 0 deletions examples/live/databento/databento_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ def on_start(self) -> None:
)
self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
# self.request_quote_ticks(instrument_id)
# self.request_trade_ticks(instrument_id)
# self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL"))
# self.request_instruments(instrument_id.venue)

def on_stop(self) -> None:
"""
Expand Down
2 changes: 2 additions & 0 deletions nautilus_trader/adapters/databento/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@

DATABENTO: Final[str] = "DATABENTO"
DATABENTO_CLIENT_ID: Final[ClientId] = ClientId(DATABENTO)

ALL_SYMBOLS = "ALL_SYMBOLS"
227 changes: 207 additions & 20 deletions nautilus_trader/adapters/databento/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pandas as pd

from nautilus_trader.adapters.databento.config import DatabentoDataClientConfig
from nautilus_trader.adapters.databento.constants import ALL_SYMBOLS
from nautilus_trader.adapters.databento.constants import DATABENTO_CLIENT_ID
from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader
from nautilus_trader.adapters.databento.parsing import parse_record
Expand All @@ -35,12 +36,17 @@
from nautilus_trader.core.nautilus_pyo3 import last_weekday_nanos
from nautilus_trader.core.uuid import UUID4
from nautilus_trader.live.data_client import LiveMarketDataClient
from nautilus_trader.model.data import Bar
from nautilus_trader.model.data import BarType
from nautilus_trader.model.data import DataType
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import BarAggregation
from nautilus_trader.model.enums import BookType
from nautilus_trader.model.enums import PriceType
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.identifiers import Venue
from nautilus_trader.model.instruments import Instrument


class DatabentoDataClient(LiveMarketDataClient):
Expand Down Expand Up @@ -323,6 +329,12 @@ async def _subscribe_bars(self, bar_type: BarType) -> None:
)
return

if bar_type.spec.price_type != PriceType.LAST:
self._log.error(
f"Cannot subscribe to {bar_type}: only `LAST` price bars are aggregated by Databento.",
)
return

if bar_type.spec.step != 1:
self._log.error(
f"Cannot subscribe to {bar_type}: only a step of 1 is supported.",
Expand Down Expand Up @@ -357,7 +369,7 @@ async def _subscribe_bars(self, bar_type: BarType) -> None:

async def _unsubscribe(self, data_type: DataType) -> None:
raise NotImplementedError(
f"Cannot unsubscribe from {data_type} (not supported by Databento).",
f"Cannot unsubscribe from {data_type} (not implemented).",
)

async def _unsubscribe_instruments(self) -> None:
Expand Down Expand Up @@ -408,26 +420,82 @@ async def _unsubscribe_bars(self, bar_type: BarType) -> None:

async def _request(self, data_type: DataType, correlation_id: UUID4) -> None:
raise NotImplementedError(
"method `_request` must be implemented in the subclass",
) # pragma: no cover
f"Cannot request {data_type} (not implemented).",
)

async def _request_instrument(
self,
instrument_id: InstrumentId,
correlation_id: UUID4,
) -> None:
raise NotImplementedError(
"method `_request_instrument` must be implemented in the subclass",
) # pragma: no cover
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
date_now_utc = self._clock.utc_now().date()
start = last_weekday_nanos( # Use midnight (UTC) of last weekday
year=date_now_utc.year,
month=date_now_utc.month,
day=date_now_utc.day,
)

data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=start,
symbols=instrument_id.symbol.value,
schema=databento.Schema.DEFINITION,
)

for record in data:
instrument = parse_record(
record=record,
publishers=self._loader.publishers(),
)

self._handle_instrument(
instrument=instrument,
correlation_id=correlation_id,
)

async def _request_instruments(
self,
venue: Venue,
correlation_id: UUID4,
) -> None:
raise NotImplementedError(
"method `_request_instruments` must be implemented in the subclass",
) # pragma: no cover
dataset: Dataset = self._loader.get_dataset_for_venue(venue)
date_now_utc = self._clock.utc_now().date()
start = (
last_weekday_nanos( # Use midnight (UTC) of last weekday
year=date_now_utc.year,
month=date_now_utc.month,
day=date_now_utc.day,
)
- pd.Timedelta(days=1).value
)

data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=pd.Timestamp(start).date().isoformat(),
symbols=ALL_SYMBOLS,
schema=databento.Schema.DEFINITION,
)

instruments: list[Instrument] = []

for record in data:
try:
instrument = parse_record(
record=record,
publishers=self._loader.publishers(),
)
except ValueError as ex:
self._log.error(repr(ex))
continue

instruments.append(instrument)

self._handle_instruments(
instruments=instruments,
venue=venue,
correlation_id=correlation_id,
)

async def _request_quote_ticks(
self,
Expand All @@ -437,9 +505,39 @@ async def _request_quote_ticks(
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
raise NotImplementedError(
"method `_request_quote_tick` must be implemented in the subclass",
) # pragma: no cover
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(),
end=end,
symbols=instrument_id.symbol.value,
schema=databento.Schema.MBP_1,
limit=limit,
)

instrument_map = databento.InstrumentMap()
instrument_map.insert_metadata(metadata=data.metadata)

ticks: list[QuoteTick] = []

for record in data:
tick = parse_record(
record=record,
publishers=self._loader.publishers(),
instrument_map=instrument_map,
)

if not isinstance(tick, QuoteTick):
# Might be `TradeTick`
continue

ticks.append(tick)

self._handle_quote_ticks(
instrument_id=instrument_id,
ticks=ticks,
correlation_id=correlation_id,
)

async def _request_trade_ticks(
self,
Expand All @@ -449,9 +547,35 @@ async def _request_trade_ticks(
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
raise NotImplementedError(
"method `_request_trade_ticks` must be implemented in the subclass",
) # pragma: no cover
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(),
end=end,
symbols=instrument_id.symbol.value,
schema=databento.Schema.TRADES,
limit=limit,
)

instrument_map = databento.InstrumentMap()
instrument_map.insert_metadata(metadata=data.metadata)

ticks: list[TradeTick] = []

for record in data:
tick = parse_record(
record=record,
publishers=self._loader.publishers(),
instrument_map=instrument_map,
)

ticks.append(tick)

self._handle_trade_ticks(
instrument_id=instrument_id,
ticks=ticks,
correlation_id=correlation_id,
)

async def _request_bars(
self,
Expand All @@ -461,9 +585,72 @@ async def _request_bars(
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
raise NotImplementedError(
"method `_request_bars` must be implemented in the subclass",
) # pragma: no cover
PyCondition.true(bar_type.is_externally_aggregated(), "aggregation_source is not EXTERNAL")

if not bar_type.spec.is_time_aggregated():
self._log.error(
f"Cannot subscribe to {bar_type}: only time bars are aggregated by Databento.",
)
return

if bar_type.spec.price_type != PriceType.LAST:
self._log.error(
f"Cannot subscribe to {bar_type}: only `LAST` price bars are aggregated by Databento.",
)
return

if bar_type.spec.step != 1:
self._log.error(
f"Cannot subscribe to {bar_type}: only a step of 1 is supported.",
)

dataset: Dataset = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue)

match bar_type.spec.aggregation:
case BarAggregation.SECOND:
schema = databento.Schema.OHLCV_1S
case BarAggregation.MINUTE:
schema = databento.Schema.OHLCV_1M
case BarAggregation.HOUR:
schema = databento.Schema.OHLCV_1H
case BarAggregation.DAY:
schema = databento.Schema.OHLCV_1D
case _:
self._log.error(
f"Cannot subscribe to {bar_type}: "
"use either 'SECOND', 'MINTUE', 'HOUR' or 'DAY' aggregations.",
)
return

data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(),
end=end,
symbols=bar_type.instrument_id.symbol.value,
schema=schema,
limit=limit,
)

instrument_map = databento.InstrumentMap()
instrument_map.insert_metadata(metadata=data.metadata)

bars: list[Bar] = []

for record in data:
bar = parse_record(
record=record,
publishers=self._loader.publishers(),
instrument_map=instrument_map,
)

bars.append(bar)

self._handle_bars(
bar_type=bar_type,
bars=bars,
partial=None, # No partials
correlation_id=correlation_id,
)

def _handle_record(self, record: databento.DBNRecord) -> None:
try:
Expand All @@ -475,10 +662,10 @@ def _handle_record(self, record: databento.DBNRecord) -> None:
self._log.info(f"SystemMsg: {record.msg}")
return

instrument_map = self._instrument_maps.get(0) # Hardcode publisher ID for now
instrument_map = self._instrument_maps.get(record.publisher_id)
if not instrument_map:
instrument_map = databento.InstrumentMap()
self._instrument_maps[0] = instrument_map
self._instrument_maps[record.publisher_id] = instrument_map

if isinstance(record, databento.SymbolMappingMsg):
instrument_map.insert_symbol_mapping_msg(record)
Expand Down
2 changes: 1 addition & 1 deletion nautilus_trader/adapters/databento/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def parse_instrument_def(
case DatabentoInstrumentClass.FX_SPOT.value:
raise ValueError("`instrument_class` FX_SPOT not currently supported")
case DatabentoInstrumentClass.OPTION_SPREAD.value:
raise ValueError("`instrument_class` OPTION_SPREAD not currently supported")
return parse_options_contract(record, instrument_id)
case DatabentoInstrumentClass.MIXED_SPREAD.value:
raise ValueError("`instrument_class` MIXED_SPREAD not currently supported")
case _:
Expand Down

0 comments on commit fdd4616

Please sign in to comment.