diff --git a/examples/live/databento/databento_subscriber.py b/examples/live/databento/databento_subscriber.py index 7b9ce4b2065e..45f439a53c3b 100644 --- a/examples/live/databento/databento_subscriber.py +++ b/examples/live/databento/databento_subscriber.py @@ -25,13 +25,13 @@ from nautilus_trader.config import TradingNodeConfig from nautilus_trader.config.common import StrategyConfig from nautilus_trader.live.node import TradingNode +from nautilus_trader.model.data import BarType from nautilus_trader.model.data import OrderBookDeltas from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick from nautilus_trader.model.enums import BookType from nautilus_trader.model.identifiers import InstrumentId from nautilus_trader.model.identifiers import TraderId -from nautilus_trader.model.identifiers import Venue from nautilus_trader.trading.strategy import Strategy @@ -41,15 +41,15 @@ # For correct subscription operation, you must specify all instruments to be immediately # subscribed for as part of the data client configuration instrument_ids = [ - # InstrumentId.from_str("AAPL.XCHI"), - InstrumentId.from_str("ESZ3.GLBX"), - InstrumentId.from_str("ESM4.GLBX"), + InstrumentId.from_str("AAPL.XCHI"), + # InstrumentId.from_str("ESZ3.GLBX"), + # InstrumentId.from_str("ESM4.GLBX"), ] # Configure the trading node config_node = TradingNodeConfig( trader_id=TraderId("TESTER-001"), - logging=LoggingConfig(log_level="INFO"), + logging=LoggingConfig(log_level="DEBUG"), exec_engine=LiveExecEngineConfig( reconciliation=False, # Not applicable inflight_check_interval_ms=0, # Not applicable @@ -141,15 +141,17 @@ def on_start(self) -> None: instrument_id=instrument_id, book_type=BookType.L2_MBP, depth=10, + client_id=DATABENTO_CLIENT_ID, ) - # self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) - # self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) - # self.request_quote_ticks(instrument_id) - # self.request_trade_ticks(instrument_id) - # self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL")) - - self.request_instruments(venue=Venue("XCHI"), client_id=DATABENTO_CLIENT_ID) - self.request_instruments(venue=Venue("XNAS"), client_id=DATABENTO_CLIENT_ID) + self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) + self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID) + self.request_quote_ticks(instrument_id) + self.request_trade_ticks(instrument_id) + self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL")) + + # self.request_instruments(venue=Venue("OPRA"), client_id=DATABENTO_CLIENT_ID) + # self.request_instruments(venue=Venue("XCHI"), client_id=DATABENTO_CLIENT_ID) + # self.request_instruments(venue=Venue("XNAS"), client_id=DATABENTO_CLIENT_ID) def on_stop(self) -> None: """ diff --git a/nautilus_trader/adapters/databento/constants.py b/nautilus_trader/adapters/databento/constants.py index f60d96563c2a..8d4136d44de2 100644 --- a/nautilus_trader/adapters/databento/constants.py +++ b/nautilus_trader/adapters/databento/constants.py @@ -15,10 +15,14 @@ from typing import Final +import pandas as pd + from nautilus_trader.model.identifiers import ClientId DATABENTO: Final[str] = "DATABENTO" DATABENTO_CLIENT_ID: Final[ClientId] = ClientId(DATABENTO) -ALL_SYMBOLS = "ALL_SYMBOLS" +ALL_SYMBOLS: Final[str] = "ALL_SYMBOLS" + +ONE_DAY: Final[pd.Timedelta] = pd.Timedelta(days=1) diff --git a/nautilus_trader/adapters/databento/data.py b/nautilus_trader/adapters/databento/data.py index 57beafa1074e..84a87a8638d4 100644 --- a/nautilus_trader/adapters/databento/data.py +++ b/nautilus_trader/adapters/databento/data.py @@ -21,13 +21,16 @@ import databento import pandas as pd +import pytz from nautilus_trader.adapters.databento.common import databento_schema_from_nautilus_bar_type from nautilus_trader.adapters.databento.config import DatabentoDataClientConfig from nautilus_trader.adapters.databento.constants import ALL_SYMBOLS from nautilus_trader.adapters.databento.constants import DATABENTO_CLIENT_ID +from nautilus_trader.adapters.databento.constants import ONE_DAY from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader from nautilus_trader.adapters.databento.parsing import parse_record +from nautilus_trader.adapters.databento.parsing import parse_record_with_metadata from nautilus_trader.adapters.databento.types import Dataset from nautilus_trader.adapters.databento.types import PublisherId from nautilus_trader.cache.cache import Cache @@ -35,6 +38,7 @@ from nautilus_trader.common.component import MessageBus from nautilus_trader.common.enums import LogColor from nautilus_trader.common.logging import Logger +from nautilus_trader.core.nautilus_pyo3 import is_within_last_24_hours from nautilus_trader.core.nautilus_pyo3 import last_weekday_nanos from nautilus_trader.core.uuid import UUID4 from nautilus_trader.live.data_client import LiveMarketDataClient @@ -210,7 +214,7 @@ def _check_live_client_started(self, dataset: Dataset, live_client: databento.Li async def _ensure_subscribed_for_instrument(self, instrument_id: InstrumentId) -> None: dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) - subscribed_instruments = self._instrument_ids.get(dataset) + subscribed_instruments = self._instrument_ids[dataset] if instrument_id in subscribed_instruments: return @@ -234,7 +238,7 @@ def _load_instrument_ids(self, dataset: Dataset, instrument_ids: list[Instrument ) for record in live_client: if isinstance(record, databento.InstrumentDefMsg): - instrument = parse_record(record, self._loader.publishers()) + instrument = parse_record_with_metadata(record, self._loader.publishers()) self._handle_data(instrument) instrument_ids_to_decode.discard(instrument.id) @@ -494,7 +498,7 @@ async def _request_instrument( ) -> None: dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) date_now_utc = self._clock.utc_now().date() - start = last_weekday_nanos( # Use midnight (UTC) of last weekday + start = last_weekday_nanos( year=date_now_utc.year, month=date_now_utc.month, day=date_now_utc.day, @@ -510,7 +514,7 @@ async def _request_instrument( for record in data: instrument = parse_record( record=record, - publishers=self._loader.publishers(), + instrument_id=instrument_id, ) self._handle_instrument( @@ -524,19 +528,23 @@ async def _request_instruments( correlation_id: UUID4, ) -> None: dataset: Dataset = self._loader.get_dataset_for_venue(venue) + date_now_utc = self._clock.utc_now().date() - start = ( - last_weekday_nanos( # Use midnight (UTC) of last weekday + default_start = pd.Timestamp( + last_weekday_nanos( year=date_now_utc.year, month=date_now_utc.month, day=date_now_utc.day, - ) - - pd.Timedelta(days=1).value + ), + tz=pytz.utc, ) + if is_within_last_24_hours(default_start.value): + default_start -= ONE_DAY + data = await self._http_client.timeseries.get_range_async( dataset=dataset, - start=pd.Timestamp(start).date().isoformat(), + start=default_start.date().isoformat(), symbols=ALL_SYMBOLS, schema=databento.Schema.DEFINITION, ) @@ -545,7 +553,7 @@ async def _request_instruments( for record in data: try: - instrument = parse_record( + instrument = parse_record_with_metadata( record=record, publishers=self._loader.publishers(), ) @@ -570,25 +578,35 @@ async def _request_quote_ticks( end: pd.Timestamp | None = None, ) -> None: dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) + + date_now_utc = self._clock.utc_now().date() + default_start = pd.Timestamp( + last_weekday_nanos( + year=date_now_utc.year, + month=date_now_utc.month, + day=date_now_utc.day, + ), + tz=pytz.utc, + ) + + if is_within_last_24_hours(default_start.value): + default_start -= ONE_DAY + data = await self._http_client.timeseries.get_range_async( dataset=dataset, - start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(), + start=start or default_start.date().isoformat(), end=end, symbols=instrument_id.symbol.value, schema=databento.Schema.MBP_1, limit=limit, ) - instrument_map = databento.InstrumentMap() - instrument_map.insert_metadata(metadata=data.metadata) - ticks: list[QuoteTick] = [] for record in data: tick = parse_record( record=record, - publishers=self._loader.publishers(), - instrument_map=instrument_map, + instrument_id=instrument_id, ) if not isinstance(tick, QuoteTick): @@ -612,25 +630,35 @@ async def _request_trade_ticks( end: pd.Timestamp | None = None, ) -> None: dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) + + date_now_utc = self._clock.utc_now().date() + default_start = pd.Timestamp( + last_weekday_nanos( + year=date_now_utc.year, + month=date_now_utc.month, + day=date_now_utc.day, + ), + tz=pytz.utc, + ) + + if is_within_last_24_hours(default_start.value): + default_start -= ONE_DAY + data = await self._http_client.timeseries.get_range_async( dataset=dataset, - start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(), + start=start or default_start.date().isoformat(), end=end, symbols=instrument_id.symbol.value, schema=databento.Schema.TRADES, limit=limit, ) - instrument_map = databento.InstrumentMap() - instrument_map.insert_metadata(metadata=data.metadata) - ticks: list[TradeTick] = [] for record in data: tick = parse_record( record=record, - publishers=self._loader.publishers(), - instrument_map=instrument_map, + instrument_id=instrument_id, ) ticks.append(tick) @@ -649,33 +677,42 @@ async def _request_bars( start: pd.Timestamp | None = None, end: pd.Timestamp | None = None, ) -> None: - dataset: Dataset = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue) - try: schema = databento_schema_from_nautilus_bar_type(bar_type) except ValueError as e: self._log.error(f"Cannot request: {e}") return + dataset: Dataset = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue) + + date_now_utc = self._clock.utc_now().date() + default_start = pd.Timestamp( + last_weekday_nanos( + year=date_now_utc.year, + month=date_now_utc.month, + day=date_now_utc.day, + ), + tz=pytz.utc, + ) + + if is_within_last_24_hours(default_start.value): + default_start -= ONE_DAY + data = await self._http_client.timeseries.get_range_async( dataset=dataset, - start=start or (self._clock.utc_now().date() - pd.Timedelta(days=1)).isoformat(), + start=start or default_start.date().isoformat(), end=end, symbols=bar_type.instrument_id.symbol.value, schema=schema, limit=limit, ) - instrument_map = databento.InstrumentMap() - instrument_map.insert_metadata(metadata=data.metadata) - bars: list[Bar] = [] for record in data: bar = parse_record( record=record, - publishers=self._loader.publishers(), - instrument_map=instrument_map, + instrument_id=bar_type.instrument_id, ) bars.append(bar) @@ -697,7 +734,7 @@ def _handle_record(self, record: databento.DBNRecord) -> None: self._log.info(f"SystemMsg: {record.msg}") return - instrument_map = self._instrument_maps.get(0) # Still hard coded for now + instrument_map = self._instrument_maps.get(record.publisher_id) if not instrument_map: instrument_map = databento.InstrumentMap() self._instrument_maps[record.publisher_id] = instrument_map @@ -706,7 +743,7 @@ def _handle_record(self, record: databento.DBNRecord) -> None: instrument_map.insert_symbol_mapping_msg(record) return - data = parse_record(record, self._loader.publishers(), instrument_map) + data = parse_record_with_metadata(record, self._loader.publishers(), instrument_map) except ValueError as e: self._log.error(f"{e!r}") return diff --git a/nautilus_trader/adapters/databento/loaders.py b/nautilus_trader/adapters/databento/loaders.py index d1d79b1e7960..b3088aa6af3f 100644 --- a/nautilus_trader/adapters/databento/loaders.py +++ b/nautilus_trader/adapters/databento/loaders.py @@ -20,7 +20,7 @@ import msgspec from nautilus_trader.adapters.databento.common import check_file_path -from nautilus_trader.adapters.databento.parsing import parse_record +from nautilus_trader.adapters.databento.parsing import parse_record_with_metadata from nautilus_trader.adapters.databento.types import DatabentoPublisher from nautilus_trader.core.correctness import PyCondition from nautilus_trader.core.data import Data @@ -227,7 +227,7 @@ def from_dbn(self, path: PathLike[str] | str) -> list[Data]: output: list[Data] = [] for record in store: - data = parse_record( + data = parse_record_with_metadata( record=record, publishers=self._publishers, instrument_map=instrument_map, diff --git a/nautilus_trader/adapters/databento/parsing.py b/nautilus_trader/adapters/databento/parsing.py index 1abd9cc63c27..d73a25f58273 100644 --- a/nautilus_trader/adapters/databento/parsing.py +++ b/nautilus_trader/adapters/databento/parsing.py @@ -351,7 +351,7 @@ def parse_ohlcv_msg( ) -def parse_record( +def parse_record_with_metadata( record: databento.DBNRecord, publishers: dict[int, DatabentoPublisher], instrument_map: databento.InstrumentMap | None = None, @@ -375,6 +375,16 @@ def parse_record( publisher=publisher, ) + return parse_record( + record=record, + instrument_id=instrument_id, + ) + + +def parse_record( + record: databento.DBNRecord, + instrument_id: InstrumentId, +) -> Data: if isinstance(record, databento.MBOMsg): return parse_mbo_msg(record, instrument_id) elif isinstance(record, databento.MBP1Msg | databento.MBP10Msg): diff --git a/nautilus_trader/adapters/databento/providers.py b/nautilus_trader/adapters/databento/providers.py index 7660eec8c504..e1cf759c4fdd 100644 --- a/nautilus_trader/adapters/databento/providers.py +++ b/nautilus_trader/adapters/databento/providers.py @@ -19,7 +19,7 @@ import pandas as pd from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader -from nautilus_trader.adapters.databento.parsing import parse_record +from nautilus_trader.adapters.databento.parsing import parse_record_with_metadata from nautilus_trader.common.clock import LiveClock from nautilus_trader.common.logging import Logger from nautilus_trader.common.providers import InstrumentProvider @@ -127,7 +127,7 @@ async def load_ids_async( ) for record in live_client: if isinstance(record, databento.InstrumentDefMsg): - instrument = parse_record(record, self._loader.publishers()) + instrument = parse_record_with_metadata(record, self._loader.publishers()) self.add(instrument=instrument) self._log.debug(f"Added instrument {instrument.id}.") @@ -208,7 +208,7 @@ async def get_range( instruments: list[Instrument] = [] for record in data: - instrument = parse_record(record, self._loader.publishers()) + instrument = parse_record_with_metadata(record, self._loader.publishers()) instruments.append(instrument) instruments = sorted(instruments, key=lambda x: x.ts_init) diff --git a/nautilus_trader/core/nautilus_pyo3.pyi b/nautilus_trader/core/nautilus_pyo3.pyi index a0eef89d7a68..3d2a576e5455 100644 --- a/nautilus_trader/core/nautilus_pyo3.pyi +++ b/nautilus_trader/core/nautilus_pyo3.pyi @@ -146,8 +146,35 @@ def last_weekday_nanos(year: int, month: int, day: int) -> int: ------- int + Raises + ------ + ValueError + If given an invalid date. + + """ + + +def is_within_last_24_hours(timestamp_ns: int) -> bool: + """ + Return whether the given UNIX nanoseconds timestamp is within the last 24 hours. + + Parameters + ---------- + timestamp_ns : int + The UNIX nanoseconds timestamp datum. + + Returns + ------- + bool + + Raises + ------ + ValueError + If `timestamp` is invalid. + """ + def convert_to_snake_case(s: str) -> str: """ Convert the given string from any common case (PascalCase, camelCase, kebab-case, etc.)