Skip to content

Commit

Permalink
Standardize requests start and end params
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Dec 18, 2023
1 parent d1e44e3 commit ab5170f
Show file tree
Hide file tree
Showing 16 changed files with 323 additions and 75 deletions.
12 changes: 7 additions & 5 deletions examples/live/databento/databento_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@
# For correct subscription operation, you must specify all instruments to be immediately
# subscribed for as part of the data client configuration
instrument_ids = [
InstrumentId.from_str("AAPL.XCHI"),
# InstrumentId.from_str("ESZ3.GLBX"),
# InstrumentId.from_str("ESM4.GLBX"),
# InstrumentId.from_str("AAPL.XCHI"),
InstrumentId.from_str("ESF4.GLBX"),
InstrumentId.from_str("ESG4.GLBX"),
InstrumentId.from_str("ESH4.GLBX"),
]

# Configure the trading node
Expand Down Expand Up @@ -76,7 +77,7 @@
# snapshot_positions_interval=5.0,
data_clients={
DATABENTO: DatabentoDataClientConfig(
api_key=None, # 'BINANCE_API_KEY' env var
api_key=None, # 'DATABENTO_API_KEY' env var
http_gateway=None,
instrument_provider=InstrumentProviderConfig(load_all=True),
instrument_ids=instrument_ids,
Expand Down Expand Up @@ -149,7 +150,8 @@ def on_start(self) -> None:
self.request_trade_ticks(instrument_id)
self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL"))

# self.request_instruments(venue=Venue("OPRA"), client_id=DATABENTO_CLIENT_ID)
# self.request_instruments(venue=Venue("GLBX"), client_id=DATABENTO_CLIENT_ID)
# self.request_instruments(venue=Venue("GLBX"), client_id=DATABENTO_CLIENT_ID)
# self.request_instruments(venue=Venue("XCHI"), client_id=DATABENTO_CLIENT_ID)
# self.request_instruments(venue=Venue("XNAS"), client_id=DATABENTO_CLIENT_ID)

Expand Down
4 changes: 4 additions & 0 deletions nautilus_trader/adapters/_template/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ async def _request_instrument(
self,
instrument_id: InstrumentId,
correlation_id: UUID4,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
raise NotImplementedError(
"method `_request_instrument` must be implemented in the subclass",
Expand All @@ -314,6 +316,8 @@ async def _request_instruments(
self,
venue: Venue,
correlation_id: UUID4,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
raise NotImplementedError(
"method `_request_instruments` must be implemented in the subclass",
Expand Down
18 changes: 17 additions & 1 deletion nautilus_trader/adapters/binance/common/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,25 @@ async def _unsubscribe_bars(self, bar_type: BarType) -> None:

# -- REQUESTS ---------------------------------------------------------------------------------

async def _request_instrument(self, instrument_id: InstrumentId, correlation_id: UUID4) -> None:
async def _request_instrument(
self,
instrument_id: InstrumentId,
correlation_id: UUID4,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
assert self._instrument_provider is not None # type checking

if start is not None:
self._log.warning(
f"Requesting instrument {instrument_id} with specified `start` which has no effect.",
)

if end is not None:
self._log.warning(
f"Requesting instrument {instrument_id} with specified `end` which has no effect.",
)

instrument: Instrument | None = self._instrument_provider.find(instrument_id)
if instrument is None:
self._log.error(f"Cannot find instrument for {instrument_id}.")
Expand Down
36 changes: 34 additions & 2 deletions nautilus_trader/adapters/bybit/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,23 @@ def _get_cached_instrument_id(self, symbol: str) -> InstrumentId:
nautilus_instrument_id: InstrumentId = bybit_symbol.parse_as_nautilus()
return nautilus_instrument_id

async def _request_instrument(self, instrument_id: InstrumentId, correlation_id: UUID4) -> None:
async def _request_instrument(
self,
instrument_id: InstrumentId,
correlation_id: UUID4,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
if start is not None:
self._log.warning(
f"Requesting instrument {instrument_id} with specified `start` which has no effect.",
)

if end is not None:
self._log.warning(
f"Requesting instrument {instrument_id} with specified `end` which has no effect.",
)

instrument: Instrument | None = self._instrument_provider.find(instrument_id)
if instrument is None:
self._log.error(f"Cannot find instrument for {instrument_id}.")
Expand All @@ -265,7 +281,23 @@ async def _request_instrument(self, instrument_id: InstrumentId, correlation_id:
correlation_id=correlation_id,
)

async def _request_instruments(self, venue: Venue, correlation_id: UUID4) -> None:
async def _request_instruments(
self,
venue: Venue,
correlation_id: UUID4,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
if start is not None:
self._log.warning(
f"Requesting instruments for {venue} with specified `start` which has no effect.",
)

if end is not None:
self._log.warning(
f"Requesting instruments for {venue} with specified `end` which has no effect.",
)

all_instruments = self._instrument_provider.get_all()
target_instruments = []
for instrument in all_instruments.values():
Expand Down
45 changes: 35 additions & 10 deletions nautilus_trader/adapters/databento/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ async def _disconnect(self) -> None:

async def _buffer_mbo_subscriptions(self) -> None:
try:
self._log.info("Buffering MBO/L3 subscriptions...")

await asyncio.sleep(self._mbo_subscriptions_delay or 0.0)
self._is_buffering_mbo_subscriptions = False

Expand Down Expand Up @@ -222,6 +220,18 @@ async def _ensure_subscribed_for_instrument(self, instrument_id: InstrumentId) -
self._instrument_ids[dataset].add(instrument_id)
await self._subscribe_instrument(instrument_id)

async def _get_dataset_range(self, dataset: Dataset) -> tuple[pd.Timestamp, pd.Timestamp]:
response = await self._loop.run_in_executor(
None,
self._http_client.metadata.get_dataset_range,
dataset,
)

start = pd.Timestamp(response["start_date"], tz=pytz.utc)
end = pd.Timestamp(response["end_date"], tz=pytz.utc)

return start, end

def _load_instrument_ids(self, dataset: Dataset, instrument_ids: list[InstrumentId]) -> None:
instrument_ids_to_decode = set(instrument_ids)

Expand Down Expand Up @@ -495,18 +505,29 @@ async def _request_instrument(
self,
instrument_id: InstrumentId,
correlation_id: UUID4,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)

_, available_end = await self._get_dataset_range(dataset)

date_now_utc = self._clock.utc_now().date()
start = last_weekday_nanos(
year=date_now_utc.year,
month=date_now_utc.month,
day=date_now_utc.day,
default_start = pd.Timestamp(
last_weekday_nanos(
year=date_now_utc.year,
month=date_now_utc.month,
day=date_now_utc.day,
),
tz=pytz.utc,
)

default_start = min(default_start, available_end - ONE_DAY)

data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=start,
start=start or default_start.date().isoformat(),
end=end,
symbols=instrument_id.symbol.value,
schema=databento.Schema.DEFINITION,
)
Expand All @@ -526,9 +547,13 @@ async def _request_instruments(
self,
venue: Venue,
correlation_id: UUID4,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> None:
dataset: Dataset = self._loader.get_dataset_for_venue(venue)

_, available_end = await self._get_dataset_range(dataset)

date_now_utc = self._clock.utc_now().date()
default_start = pd.Timestamp(
last_weekday_nanos(
Expand All @@ -539,12 +564,12 @@ async def _request_instruments(
tz=pytz.utc,
)

if is_within_last_24_hours(default_start.value):
default_start -= ONE_DAY
default_start = min(default_start, available_end - ONE_DAY)

data = await self._http_client.timeseries.get_range_async(
dataset=dataset,
start=default_start.date().isoformat(),
start=start or default_start.date().isoformat(),
end=end,
symbols=ALL_SYMBOLS,
schema=databento.Schema.DEFINITION,
)
Expand Down
26 changes: 24 additions & 2 deletions nautilus_trader/adapters/interactive_brokers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,23 @@ async def _request(self, data_type: DataType, correlation_id: UUID4) -> None:
"implement the `_request` coroutine", # pragma: no cover
)

async def _request_instrument(self, instrument_id: InstrumentId, correlation_id: UUID4):
async def _request_instrument(
self,
instrument_id: InstrumentId,
correlation_id: UUID4,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
):
if start is not None:
self._log.warning(
f"Requesting instrument {instrument_id} with specified `start` which has no effect.",
)

if end is not None:
self._log.warning(
f"Requesting instrument {instrument_id} with specified `end` which has no effect.",
)

await self.instrument_provider.load_async(instrument_id)
if instrument := self.instrument_provider.find(instrument_id):
self._handle_data(instrument)
Expand All @@ -292,7 +308,13 @@ async def _request_instrument(self, instrument_id: InstrumentId, correlation_id:
return
self._handle_instrument(instrument, correlation_id)

async def _request_instruments(self, venue: Venue, correlation_id: UUID4):
async def _request_instruments(
self,
venue: Venue,
correlation_id: UUID4,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
):
raise NotImplementedError( # pragma: no cover
"implement the `_request_instruments` coroutine", # pragma: no cover
)
Expand Down
12 changes: 6 additions & 6 deletions nautilus_trader/adapters/interactive_brokers/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,9 @@ async def generate_order_status_reports(
instrument_id : InstrumentId, optional
The instrument ID query filter.
start : pd.Timestamp, optional
The start datetime query filter.
The start datetime (UTC) query filter.
end : pd.Timestamp, optional
The end datetime query filter.
The end datetime (UTC) query filter.
open_only : bool, default False
If the query is for open orders only.
Expand Down Expand Up @@ -417,9 +417,9 @@ async def generate_trade_reports(
venue_order_id : VenueOrderId, optional
The venue order ID (assigned by the venue) query filter.
start : pd.Timestamp, optional
The start datetime query filter.
The start datetime (UTC) query filter.
end : pd.Timestamp, optional
The end datetime query filter.
The end datetime (UTC) query filter.
Returns
-------
Expand All @@ -445,9 +445,9 @@ async def generate_position_status_reports(
instrument_id : InstrumentId, optional
The instrument ID query filter.
start : pd.Timestamp, optional
The start datetime query filter.
The start datetime (UTC) query filter.
end : pd.Timestamp, optional
The end datetime query filter.
The end datetime (UTC) query filter.
Returns
-------
Expand Down
16 changes: 14 additions & 2 deletions nautilus_trader/backtest/data_client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,13 @@ cdef class BacktestMarketDataClient(MarketDataClient):

# -- REQUESTS -------------------------------------------------------------------------------------

cpdef void request_instrument(self, InstrumentId instrument_id, UUID4 correlation_id):
cpdef void request_instrument(
self,
InstrumentId instrument_id,
UUID4 correlation_id,
datetime start: Optional[datetime] = None,
datetime end: Optional[datetime] = None,
):
Condition.not_none(instrument_id, "instrument_id")
Condition.not_none(correlation_id, "correlation_id")

Expand All @@ -369,7 +375,13 @@ cdef class BacktestMarketDataClient(MarketDataClient):
correlation_id=correlation_id,
)

cpdef void request_instruments(self, Venue venue, UUID4 correlation_id):
cpdef void request_instruments(
self,
Venue venue,
UUID4 correlation_id,
datetime start: Optional[datetime] = None,
datetime end: Optional[datetime] = None,
):
Condition.not_none(correlation_id, "correlation_id")

cdef list instruments = self._cache.instruments(venue)
Expand Down
8 changes: 4 additions & 4 deletions nautilus_trader/backtest/engine.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -884,11 +884,11 @@ cdef class BacktestEngine:
Parameters
----------
start : Union[datetime, str, int], optional
The start datetime (UTC) for the backtest run. If ``None`` engine runs
from the start of the data.
The start datetime (UTC) for the backtest run.
If ``None`` engine runs from the start of the data.
end : Union[datetime, str, int], optional
The end datetime (UTC) for the backtest run. If ``None`` engine runs
to the end of the data.
The end datetime (UTC) for the backtest run.
If ``None`` engine runs to the end of the data.
run_config_id : str, optional
The tokenized `BacktestRunConfig` ID.
streaming : bool, default False
Expand Down
25 changes: 22 additions & 3 deletions nautilus_trader/common/actor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,28 @@ cdef class Actor(Component):

# -- REQUESTS -------------------------------------------------------------------------------------

cpdef UUID4 request_data(self, DataType data_type, ClientId client_id, callback=*)
cpdef UUID4 request_instrument(self, InstrumentId instrument_id, ClientId client_id=*, callback=*)
cpdef UUID4 request_instruments(self, Venue venue, ClientId client_id=*, callback=*)
cpdef UUID4 request_data(
self,
DataType data_type,
ClientId client_id,
callback=*,
)
cpdef UUID4 request_instrument(
self,
InstrumentId instrument_id,
datetime start=*,
datetime end=*,
ClientId client_id=*,
callback=*,
)
cpdef UUID4 request_instruments(
self,
Venue venue,
datetime start=*,
datetime end=*,
ClientId client_id=*,
callback=*,
)
cpdef UUID4 request_quote_ticks(
self,
InstrumentId instrument_id,
Expand Down
Loading

0 comments on commit ab5170f

Please sign in to comment.