Skip to content

Commit

Permalink
Continue DatabentoInstrumentProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Dec 9, 2023
1 parent 2a9feab commit f76d018
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 27 deletions.
115 changes: 88 additions & 27 deletions nautilus_trader/adapters/databento/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
# limitations under the License.
# -------------------------------------------------------------------------------------------------

import datetime as dt

import databento
import pandas as pd

from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader
from nautilus_trader.adapters.databento.parsing import parse_record
Expand All @@ -23,6 +26,7 @@
from nautilus_trader.config import InstrumentProviderConfig
from nautilus_trader.core.correctness import PyCondition
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.instruments import Instrument


class DatabentoInstrumentProvider(InstrumentProvider):
Expand Down Expand Up @@ -83,10 +87,12 @@ async def load_ids_async(
filters: dict | None = None,
) -> None:
"""
Load the given instrument IDs into the provider by requesting the latest
instrument definitions from Databento.
Load the latest instrument definitions for the given instrument IDs into the
provider by requesting the latest instrument definition messages from Databento.
You can only request instrument definitions from one venue (dataset) at a time.
You can only request instrument definitions from one dataset at a time.
The Databento dataset will be determined from either the filters, or the venues for the
instrument IDs.
Parameters
----------
Expand All @@ -107,19 +113,9 @@ async def load_ids_async(
"""
PyCondition.not_empty(instrument_ids, "instrument_ids")

# Check all venues are equal
first_venue = instrument_ids[0].venue
for instrument_id in instrument_ids:
PyCondition.equal(
first_venue,
instrument_id.venue,
"first venue",
"instrument_id.venue",
)

instrument_ids_to_decode = set(instrument_ids)

dataset = self._loader.get_dataset_for_venue(first_venue)
dataset = self._check_all_datasets_equal(instrument_ids)
live_client = self._get_live_client(dataset)

live_client.subscribe(
Expand All @@ -145,25 +141,90 @@ async def load_async(
instrument_id: InstrumentId,
filters: dict | None = None,
) -> None:
dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
"""
Load the latest instrument definition for the given instrument ID into the
provider by requesting the latest instrument definition message from Databento.
live_client.subscribe(
The Databento dataset will be determined from either the filters, or the venue for the
instrument ID.
Parameters
----------
instrument_id : InstrumentId
The instrument ID to load.
filters : dict, optional
The optional filters for the instrument definition request.
Warnings
--------
Calling this method will incur a cost to your Databento account in USD.
"""
await self.load_ids_async([instrument_id])

async def get_range(
self,
instrument_ids: list[InstrumentId],
start: pd.Timestamp | dt.date | str | int,
end: pd.Timestamp | dt.date | str | int | None = None,
filters: dict | None = None,
) -> list[Instrument]:
"""
Request a time series of instrument definitions for the given instrument IDs by
making a `/timeseries.get_range(...)` request from Databento.
Parameters
----------
instrument_ids : list[InstrumentId]
The instrument IDs for the request.
start : pd.Timestamp or date or str or int
The start datetime of the request time range (inclusive).
Assumes UTC as timezone unless passed a tz-aware object.
If an integer is passed, then this represents nanoseconds since the UNIX epoch.
end : pd.Timestamp or date or str or int, optional
The end datetime of the request time range (exclusive).
Assumes UTC as timezone unless passed a tz-aware object.
If an integer is passed, then this represents nanoseconds since the UNIX epoch.
Values are forward-filled based on the resolution provided.
Defaults to the same value as `start`.
filters : dict, optional
The optional filters for the instrument definition request.
Warnings
--------
Calling this method will incur a cost to your Databento account in USD.
"""
dataset = self._check_all_datasets_equal(instrument_ids)
data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
schema=databento.Schema.DEFINITION,
symbols=[instrument_id.symbol.value],
start=start,
end=end,
symbols=[i.symbol.value for i in instrument_ids],
stype_in=databento.SType.RAW_SYMBOL,
start=0, # From start of current session (latest definition)
)
for record in live_client:
print(record)
if isinstance(record, databento.InstrumentDefMsg):
instrument = parse_record(record, self._loader.publishers())
self.add(instrument=instrument)
self._log.debug(f"Added instrument {instrument.id}.")

# Close the connection (we will still process all received data)
live_client.stop()
instruments: list[Instrument] = []

for record in data:
instrument = parse_record(record, self._loader.publishers())
instruments.append(instrument)

instruments = sorted(instruments, key=lambda x: x.ts_init)
return instruments

def _check_all_datasets_equal(self, instrument_ids: list[InstrumentId]) -> str:
first_dataset = self._loader.get_dataset_for_venue(instrument_ids[0].venue)
for instrument_id in instrument_ids:
next_dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
if first_dataset != next_dataset:
raise ValueError(
"Databento datasets for the provided `instrument_ids` were not equal, "
f"'{first_dataset}' vs '{next_dataset}'",
)

return first_dataset

def _get_live_client(self, dataset: str) -> databento.Live:
client = self._live_clients.get(dataset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import asyncio

import pandas as pd

from nautilus_trader.adapters.databento.factories import get_cached_databento_http_client
from nautilus_trader.adapters.databento.providers import DatabentoInstrumentProvider
from nautilus_trader.common.clock import LiveClock
Expand All @@ -36,13 +38,23 @@ async def test_databento_instrument_provider():
),
)

await provider.load_async(InstrumentId.from_str("ESH4.GLBX"))

instrument_ids = [
InstrumentId.from_str("ESZ3.GLBX"),
InstrumentId.from_str("ESH4.GLBX"),
InstrumentId.from_str("ESM4.GLBX"),
# InstrumentId.from_str("AAPL.XNAS"),
]
await provider.load_ids_async(instrument_ids)

instruments = await provider.get_range(
instrument_ids=instrument_ids,
start=(pd.Timestamp.utcnow() - pd.Timedelta(days=5)).date().isoformat(),
)

print(instruments)


if __name__ == "__main__":
asyncio.run(test_databento_instrument_provider())

0 comments on commit f76d018

Please sign in to comment.