Skip to content

Commit

Permalink
Continue Databento integration
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Dec 17, 2023
1 parent 1f11073 commit 01ad45b
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 52 deletions.
28 changes: 15 additions & 13 deletions examples/live/databento/databento_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
from nautilus_trader.config import TradingNodeConfig
from nautilus_trader.config.common import StrategyConfig
from nautilus_trader.live.node import TradingNode
from nautilus_trader.model.data import BarType
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import BookType
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.identifiers import TraderId
from nautilus_trader.model.identifiers import Venue
from nautilus_trader.trading.strategy import Strategy


Expand All @@ -41,15 +41,15 @@
# For correct subscription operation, you must specify all instruments to be immediately
# subscribed for as part of the data client configuration
instrument_ids = [
# InstrumentId.from_str("AAPL.XCHI"),
InstrumentId.from_str("ESZ3.GLBX"),
InstrumentId.from_str("ESM4.GLBX"),
InstrumentId.from_str("AAPL.XCHI"),
# InstrumentId.from_str("ESZ3.GLBX"),
# InstrumentId.from_str("ESM4.GLBX"),
]

# Configure the trading node
config_node = TradingNodeConfig(
trader_id=TraderId("TESTER-001"),
logging=LoggingConfig(log_level="INFO"),
logging=LoggingConfig(log_level="DEBUG"),
exec_engine=LiveExecEngineConfig(
reconciliation=False, # Not applicable
inflight_check_interval_ms=0, # Not applicable
Expand Down Expand Up @@ -141,15 +141,17 @@ def on_start(self) -> None:
instrument_id=instrument_id,
book_type=BookType.L2_MBP,
depth=10,
client_id=DATABENTO_CLIENT_ID,
)
# self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
# self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
# self.request_quote_ticks(instrument_id)
# self.request_trade_ticks(instrument_id)
# self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL"))

self.request_instruments(venue=Venue("XCHI"), client_id=DATABENTO_CLIENT_ID)
self.request_instruments(venue=Venue("XNAS"), client_id=DATABENTO_CLIENT_ID)
self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
self.request_quote_ticks(instrument_id)
self.request_trade_ticks(instrument_id)
self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL"))

# self.request_instruments(venue=Venue("OPRA"), client_id=DATABENTO_CLIENT_ID)
# self.request_instruments(venue=Venue("XCHI"), client_id=DATABENTO_CLIENT_ID)
# self.request_instruments(venue=Venue("XNAS"), client_id=DATABENTO_CLIENT_ID)

def on_stop(self) -> None:
"""
Expand Down
6 changes: 5 additions & 1 deletion nautilus_trader/adapters/databento/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@

from typing import Final

import pandas as pd

from nautilus_trader.model.identifiers import ClientId


DATABENTO: Final[str] = "DATABENTO"
DATABENTO_CLIENT_ID: Final[ClientId] = ClientId(DATABENTO)

ALL_SYMBOLS = "ALL_SYMBOLS"
ALL_SYMBOLS: Final[str] = "ALL_SYMBOLS"

ONE_DAY: Final[pd.Timedelta] = pd.Timedelta(days=1)
101 changes: 69 additions & 32 deletions nautilus_trader/adapters/databento/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@

import databento
import pandas as pd
import pytz

from nautilus_trader.adapters.databento.common import databento_schema_from_nautilus_bar_type
from nautilus_trader.adapters.databento.config import DatabentoDataClientConfig
from nautilus_trader.adapters.databento.constants import ALL_SYMBOLS
from nautilus_trader.adapters.databento.constants import DATABENTO_CLIENT_ID
from nautilus_trader.adapters.databento.constants import ONE_DAY
from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader
from nautilus_trader.adapters.databento.parsing import parse_record
from nautilus_trader.adapters.databento.parsing import parse_record_with_metadata
from nautilus_trader.adapters.databento.types import Dataset
from nautilus_trader.adapters.databento.types import PublisherId
from nautilus_trader.cache.cache import Cache
from nautilus_trader.common.clock import LiveClock
from nautilus_trader.common.component import MessageBus
from nautilus_trader.common.enums import LogColor
from nautilus_trader.common.logging import Logger
from nautilus_trader.core.nautilus_pyo3 import is_within_last_24_hours
from nautilus_trader.core.nautilus_pyo3 import last_weekday_nanos
from nautilus_trader.core.uuid import UUID4
from nautilus_trader.live.data_client import LiveMarketDataClient
Expand Down Expand Up @@ -210,7 +214,7 @@ def _check_live_client_started(self, dataset: Dataset, live_client: databento.Li

async def _ensure_subscribed_for_instrument(self, instrument_id: InstrumentId) -> None:
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
subscribed_instruments = self._instrument_ids.get(dataset)
subscribed_instruments = self._instrument_ids[dataset]

if instrument_id in subscribed_instruments:
return
Expand All @@ -234,7 +238,7 @@ def _load_instrument_ids(self, dataset: Dataset, instrument_ids: list[Instrument
)
for record in live_client:
if isinstance(record, databento.InstrumentDefMsg):
instrument = parse_record(record, self._loader.publishers())
instrument = parse_record_with_metadata(record, self._loader.publishers())
self._handle_data(instrument)

instrument_ids_to_decode.discard(instrument.id)
Expand Down Expand Up @@ -494,7 +498,7 @@ async def _request_instrument(
) -> None:
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
date_now_utc = self._clock.utc_now().date()
start = last_weekday_nanos( # Use midnight (UTC) of last weekday
start = last_weekday_nanos(
year=date_now_utc.year,
month=date_now_utc.month,
day=date_now_utc.day,
Expand All @@ -510,7 +514,7 @@ async def _request_instrument(
for record in data:
instrument = parse_record(
record=record,
publishers=self._loader.publishers(),
instrument_id=instrument_id,
)

self._handle_instrument(
Expand All @@ -524,19 +528,23 @@ async def _request_instruments(
correlation_id: UUID4,
) -> None:
dataset: Dataset = self._loader.get_dataset_for_venue(venue)

date_now_utc = self._clock.utc_now().date()
start = (
last_weekday_nanos( # Use midnight (UTC) of last weekday
default_start = pd.Timestamp(
last_weekday_nanos(
year=date_now_utc.year,
month=date_now_utc.month,
day=date_now_utc.day,
)
- pd.Timedelta(days=1).value
),
tz=pytz.utc,
)

if is_within_last_24_hours(default_start.value):
default_start -= ONE_DAY

data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=pd.Timestamp(start).date().isoformat(),
start=default_start.date().isoformat(),
symbols=ALL_SYMBOLS,
schema=databento.Schema.DEFINITION,
)
Expand All @@ -545,7 +553,7 @@ async def _request_instruments(

for record in data:
try:
instrument = parse_record(
instrument = parse_record_with_metadata(
record=record,
publishers=self._loader.publishers(),
)
Expand All @@ -570,25 +578,35 @@ async def _request_quote_ticks(
end: pd.Timestamp | None = None,
) -> None:
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)

date_now_utc = self._clock.utc_now().date()
default_start = pd.Timestamp(
last_weekday_nanos(
year=date_now_utc.year,
month=date_now_utc.month,
day=date_now_utc.day,
),
tz=pytz.utc,
)

if is_within_last_24_hours(default_start.value):
default_start -= ONE_DAY

data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(),
start=start or default_start.date().isoformat(),
end=end,
symbols=instrument_id.symbol.value,
schema=databento.Schema.MBP_1,
limit=limit,
)

instrument_map = databento.InstrumentMap()
instrument_map.insert_metadata(metadata=data.metadata)

ticks: list[QuoteTick] = []

for record in data:
tick = parse_record(
record=record,
publishers=self._loader.publishers(),
instrument_map=instrument_map,
instrument_id=instrument_id,
)

if not isinstance(tick, QuoteTick):
Expand All @@ -612,25 +630,35 @@ async def _request_trade_ticks(
end: pd.Timestamp | None = None,
) -> None:
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)

date_now_utc = self._clock.utc_now().date()
default_start = pd.Timestamp(
last_weekday_nanos(
year=date_now_utc.year,
month=date_now_utc.month,
day=date_now_utc.day,
),
tz=pytz.utc,
)

if is_within_last_24_hours(default_start.value):
default_start -= ONE_DAY

data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(),
start=start or default_start.date().isoformat(),
end=end,
symbols=instrument_id.symbol.value,
schema=databento.Schema.TRADES,
limit=limit,
)

instrument_map = databento.InstrumentMap()
instrument_map.insert_metadata(metadata=data.metadata)

ticks: list[TradeTick] = []

for record in data:
tick = parse_record(
record=record,
publishers=self._loader.publishers(),
instrument_map=instrument_map,
instrument_id=instrument_id,
)

ticks.append(tick)
Expand All @@ -649,33 +677,42 @@ async def _request_bars(
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
dataset: Dataset = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue)

try:
schema = databento_schema_from_nautilus_bar_type(bar_type)
except ValueError as e:
self._log.error(f"Cannot request: {e}")
return

dataset: Dataset = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue)

date_now_utc = self._clock.utc_now().date()
default_start = pd.Timestamp(
last_weekday_nanos(
year=date_now_utc.year,
month=date_now_utc.month,
day=date_now_utc.day,
),
tz=pytz.utc,
)

if is_within_last_24_hours(default_start.value):
default_start -= ONE_DAY

data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(),
start=start or default_start.date().isoformat(),
end=end,
symbols=bar_type.instrument_id.symbol.value,
schema=schema,
limit=limit,
)

instrument_map = databento.InstrumentMap()
instrument_map.insert_metadata(metadata=data.metadata)

bars: list[Bar] = []

for record in data:
bar = parse_record(
record=record,
publishers=self._loader.publishers(),
instrument_map=instrument_map,
instrument_id=bar_type.instrument_id,
)

bars.append(bar)
Expand All @@ -697,7 +734,7 @@ def _handle_record(self, record: databento.DBNRecord) -> None:
self._log.info(f"SystemMsg: {record.msg}")
return

instrument_map = self._instrument_maps.get(0) # Still hard coded for now
instrument_map = self._instrument_maps.get(record.publisher_id)
if not instrument_map:
instrument_map = databento.InstrumentMap()
self._instrument_maps[record.publisher_id] = instrument_map
Expand All @@ -706,7 +743,7 @@ def _handle_record(self, record: databento.DBNRecord) -> None:
instrument_map.insert_symbol_mapping_msg(record)
return

data = parse_record(record, self._loader.publishers(), instrument_map)
data = parse_record_with_metadata(record, self._loader.publishers(), instrument_map)
except ValueError as e:
self._log.error(f"{e!r}")
return
Expand Down
4 changes: 2 additions & 2 deletions nautilus_trader/adapters/databento/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import msgspec

from nautilus_trader.adapters.databento.common import check_file_path
from nautilus_trader.adapters.databento.parsing import parse_record
from nautilus_trader.adapters.databento.parsing import parse_record_with_metadata
from nautilus_trader.adapters.databento.types import DatabentoPublisher
from nautilus_trader.core.correctness import PyCondition
from nautilus_trader.core.data import Data
Expand Down Expand Up @@ -227,7 +227,7 @@ def from_dbn(self, path: PathLike[str] | str) -> list[Data]:
output: list[Data] = []

for record in store:
data = parse_record(
data = parse_record_with_metadata(
record=record,
publishers=self._publishers,
instrument_map=instrument_map,
Expand Down
12 changes: 11 additions & 1 deletion nautilus_trader/adapters/databento/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def parse_ohlcv_msg(
)


def parse_record(
def parse_record_with_metadata(
record: databento.DBNRecord,
publishers: dict[int, DatabentoPublisher],
instrument_map: databento.InstrumentMap | None = None,
Expand All @@ -375,6 +375,16 @@ def parse_record(
publisher=publisher,
)

return parse_record(
record=record,
instrument_id=instrument_id,
)


def parse_record(
record: databento.DBNRecord,
instrument_id: InstrumentId,
) -> Data:
if isinstance(record, databento.MBOMsg):
return parse_mbo_msg(record, instrument_id)
elif isinstance(record, databento.MBP1Msg | databento.MBP10Msg):
Expand Down
6 changes: 3 additions & 3 deletions nautilus_trader/adapters/databento/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pandas as pd

from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader
from nautilus_trader.adapters.databento.parsing import parse_record
from nautilus_trader.adapters.databento.parsing import parse_record_with_metadata
from nautilus_trader.common.clock import LiveClock
from nautilus_trader.common.logging import Logger
from nautilus_trader.common.providers import InstrumentProvider
Expand Down Expand Up @@ -127,7 +127,7 @@ async def load_ids_async(
)
for record in live_client:
if isinstance(record, databento.InstrumentDefMsg):
instrument = parse_record(record, self._loader.publishers())
instrument = parse_record_with_metadata(record, self._loader.publishers())
self.add(instrument=instrument)
self._log.debug(f"Added instrument {instrument.id}.")

Expand Down Expand Up @@ -208,7 +208,7 @@ async def get_range(
instruments: list[Instrument] = []

for record in data:
instrument = parse_record(record, self._loader.publishers())
instrument = parse_record_with_metadata(record, self._loader.publishers())
instruments.append(instrument)

instruments = sorted(instruments, key=lambda x: x.ts_init)
Expand Down
Loading

0 comments on commit 01ad45b

Please sign in to comment.