From f76d018b112b46cd2a19fa4bc635da7aad90b181 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Sat, 9 Dec 2023 16:58:55 +1100 Subject: [PATCH] Continue DatabentoInstrumentProvider --- .../adapters/databento/providers.py | 115 ++++++++++++++---- .../sandbox/sandbox_instrument_provider.py | 12 ++ 2 files changed, 100 insertions(+), 27 deletions(-) diff --git a/nautilus_trader/adapters/databento/providers.py b/nautilus_trader/adapters/databento/providers.py index ed90c06880d6..7660eec8c504 100644 --- a/nautilus_trader/adapters/databento/providers.py +++ b/nautilus_trader/adapters/databento/providers.py @@ -13,7 +13,10 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- +import datetime as dt + import databento +import pandas as pd from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader from nautilus_trader.adapters.databento.parsing import parse_record @@ -23,6 +26,7 @@ from nautilus_trader.config import InstrumentProviderConfig from nautilus_trader.core.correctness import PyCondition from nautilus_trader.model.identifiers import InstrumentId +from nautilus_trader.model.instruments import Instrument class DatabentoInstrumentProvider(InstrumentProvider): @@ -83,10 +87,12 @@ async def load_ids_async( filters: dict | None = None, ) -> None: """ - Load the given instrument IDs into the provider by requesting the latest - instrument definitions from Databento. + Load the latest instrument definitions for the given instrument IDs into the + provider by requesting the latest instrument definition messages from Databento. - You can only request instrument definitions from one venue (dataset) at a time. + You can only request instrument definitions from one dataset at a time. + The Databento dataset will be determined from either the filters, or the venues for the + instrument IDs. Parameters ---------- @@ -107,19 +113,9 @@ async def load_ids_async( """ PyCondition.not_empty(instrument_ids, "instrument_ids") - # Check all venues are equal - first_venue = instrument_ids[0].venue - for instrument_id in instrument_ids: - PyCondition.equal( - first_venue, - instrument_id.venue, - "first venue", - "instrument_id.venue", - ) - instrument_ids_to_decode = set(instrument_ids) - dataset = self._loader.get_dataset_for_venue(first_venue) + dataset = self._check_all_datasets_equal(instrument_ids) live_client = self._get_live_client(dataset) live_client.subscribe( @@ -145,25 +141,90 @@ async def load_async( instrument_id: InstrumentId, filters: dict | None = None, ) -> None: - dataset = self._loader.get_dataset_for_venue(instrument_id.venue) - live_client = self._get_live_client(dataset) + """ + Load the latest instrument definition for the given instrument ID into the + provider by requesting the latest instrument definition message from Databento. - live_client.subscribe( + The Databento dataset will be determined from either the filters, or the venue for the + instrument ID. + + Parameters + ---------- + instrument_id : InstrumentId + The instrument ID to load. + filters : dict, optional + The optional filters for the instrument definition request. + + Warnings + -------- + Calling this method will incur a cost to your Databento account in USD. + + """ + await self.load_ids_async([instrument_id]) + + async def get_range( + self, + instrument_ids: list[InstrumentId], + start: pd.Timestamp | dt.date | str | int, + end: pd.Timestamp | dt.date | str | int | None = None, + filters: dict | None = None, + ) -> list[Instrument]: + """ + Request a time series of instrument definitions for the given instrument IDs by + making a `/timeseries.get_range(...)` request from Databento. + + Parameters + ---------- + instrument_ids : list[InstrumentId] + The instrument IDs for the request. + start : pd.Timestamp or date or str or int + The start datetime of the request time range (inclusive). + Assumes UTC as timezone unless passed a tz-aware object. + If an integer is passed, then this represents nanoseconds since the UNIX epoch. + end : pd.Timestamp or date or str or int, optional + The end datetime of the request time range (exclusive). + Assumes UTC as timezone unless passed a tz-aware object. + If an integer is passed, then this represents nanoseconds since the UNIX epoch. + Values are forward-filled based on the resolution provided. + Defaults to the same value as `start`. + filters : dict, optional + The optional filters for the instrument definition request. + + Warnings + -------- + Calling this method will incur a cost to your Databento account in USD. + + """ + dataset = self._check_all_datasets_equal(instrument_ids) + data = await self._http_client.timeseries.get_range_async( dataset=dataset, schema=databento.Schema.DEFINITION, - symbols=[instrument_id.symbol.value], + start=start, + end=end, + symbols=[i.symbol.value for i in instrument_ids], stype_in=databento.SType.RAW_SYMBOL, - start=0, # From start of current session (latest definition) ) - for record in live_client: - print(record) - if isinstance(record, databento.InstrumentDefMsg): - instrument = parse_record(record, self._loader.publishers()) - self.add(instrument=instrument) - self._log.debug(f"Added instrument {instrument.id}.") - # Close the connection (we will still process all received data) - live_client.stop() + instruments: list[Instrument] = [] + + for record in data: + instrument = parse_record(record, self._loader.publishers()) + instruments.append(instrument) + + instruments = sorted(instruments, key=lambda x: x.ts_init) + return instruments + + def _check_all_datasets_equal(self, instrument_ids: list[InstrumentId]) -> str: + first_dataset = self._loader.get_dataset_for_venue(instrument_ids[0].venue) + for instrument_id in instrument_ids: + next_dataset = self._loader.get_dataset_for_venue(instrument_id.venue) + if first_dataset != next_dataset: + raise ValueError( + "Databento datasets for the provided `instrument_ids` were not equal, " + f"'{first_dataset}' vs '{next_dataset}'", + ) + + return first_dataset def _get_live_client(self, dataset: str) -> databento.Live: client = self._live_clients.get(dataset) diff --git a/tests/integration_tests/adapters/databento/sandbox/sandbox_instrument_provider.py b/tests/integration_tests/adapters/databento/sandbox/sandbox_instrument_provider.py index 93527fd172d3..bafd9d0a0a4d 100644 --- a/tests/integration_tests/adapters/databento/sandbox/sandbox_instrument_provider.py +++ b/tests/integration_tests/adapters/databento/sandbox/sandbox_instrument_provider.py @@ -15,6 +15,8 @@ import asyncio +import pandas as pd + from nautilus_trader.adapters.databento.factories import get_cached_databento_http_client from nautilus_trader.adapters.databento.providers import DatabentoInstrumentProvider from nautilus_trader.common.clock import LiveClock @@ -36,13 +38,23 @@ async def test_databento_instrument_provider(): ), ) + await provider.load_async(InstrumentId.from_str("ESH4.GLBX")) + instrument_ids = [ InstrumentId.from_str("ESZ3.GLBX"), InstrumentId.from_str("ESH4.GLBX"), InstrumentId.from_str("ESM4.GLBX"), + # InstrumentId.from_str("AAPL.XNAS"), ] await provider.load_ids_async(instrument_ids) + instruments = await provider.get_range( + instrument_ids=instrument_ids, + start=(pd.Timestamp.utcnow() - pd.Timedelta(days=5)).date().isoformat(), + ) + + print(instruments) + if __name__ == "__main__": asyncio.run(test_databento_instrument_provider())