From ab5170fc5311e4ff0c046a061d1ee88bc67d782a Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Mon, 18 Dec 2023 12:10:47 +1100 Subject: [PATCH] Standardize requests start and end params --- .../live/databento/databento_subscriber.py | 12 ++-- nautilus_trader/adapters/_template/data.py | 4 ++ .../adapters/binance/common/data.py | 18 +++++- nautilus_trader/adapters/bybit/data.py | 36 +++++++++++- nautilus_trader/adapters/databento/data.py | 45 ++++++++++---- .../adapters/interactive_brokers/data.py | 26 ++++++++- .../adapters/interactive_brokers/execution.py | 12 ++-- nautilus_trader/backtest/data_client.pyx | 16 ++++- nautilus_trader/backtest/engine.pyx | 8 +-- nautilus_trader/common/actor.pxd | 25 +++++++- nautilus_trader/common/actor.pyx | 58 ++++++++++++++----- nautilus_trader/data/client.pxd | 16 ++++- nautilus_trader/data/client.pyx | 44 ++++++++++---- nautilus_trader/data/engine.pyx | 14 ++++- nautilus_trader/live/data_client.py | 52 +++++++++++++++-- nautilus_trader/live/execution_client.py | 12 ++-- 16 files changed, 323 insertions(+), 75 deletions(-) diff --git a/examples/live/databento/databento_subscriber.py b/examples/live/databento/databento_subscriber.py index 45f439a53c3b..aa8f8d053f44 100644 --- a/examples/live/databento/databento_subscriber.py +++ b/examples/live/databento/databento_subscriber.py @@ -41,9 +41,10 @@ # For correct subscription operation, you must specify all instruments to be immediately # subscribed for as part of the data client configuration instrument_ids = [ - InstrumentId.from_str("AAPL.XCHI"), - # InstrumentId.from_str("ESZ3.GLBX"), - # InstrumentId.from_str("ESM4.GLBX"), + # InstrumentId.from_str("AAPL.XCHI"), + InstrumentId.from_str("ESF4.GLBX"), + InstrumentId.from_str("ESG4.GLBX"), + InstrumentId.from_str("ESH4.GLBX"), ] # Configure the trading node @@ -76,7 +77,7 @@ # snapshot_positions_interval=5.0, data_clients={ DATABENTO: DatabentoDataClientConfig( - api_key=None, # 'BINANCE_API_KEY' env var + api_key=None, # 'DATABENTO_API_KEY' env var http_gateway=None, instrument_provider=InstrumentProviderConfig(load_all=True), instrument_ids=instrument_ids, @@ -149,7 +150,8 @@ def on_start(self) -> None: self.request_trade_ticks(instrument_id) self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL")) - # self.request_instruments(venue=Venue("OPRA"), client_id=DATABENTO_CLIENT_ID) + # self.request_instruments(venue=Venue("GLBX"), client_id=DATABENTO_CLIENT_ID) + # self.request_instruments(venue=Venue("GLBX"), client_id=DATABENTO_CLIENT_ID) # self.request_instruments(venue=Venue("XCHI"), client_id=DATABENTO_CLIENT_ID) # self.request_instruments(venue=Venue("XNAS"), client_id=DATABENTO_CLIENT_ID) diff --git a/nautilus_trader/adapters/_template/data.py b/nautilus_trader/adapters/_template/data.py index 9d25edd2f36a..e1303780f017 100644 --- a/nautilus_trader/adapters/_template/data.py +++ b/nautilus_trader/adapters/_template/data.py @@ -305,6 +305,8 @@ async def _request_instrument( self, instrument_id: InstrumentId, correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, ) -> None: raise NotImplementedError( "method `_request_instrument` must be implemented in the subclass", @@ -314,6 +316,8 @@ async def _request_instruments( self, venue: Venue, correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, ) -> None: raise NotImplementedError( "method `_request_instruments` must be implemented in the subclass", diff --git a/nautilus_trader/adapters/binance/common/data.py b/nautilus_trader/adapters/binance/common/data.py index 78fce1bf36aa..d5cfa7f2f450 100644 --- a/nautilus_trader/adapters/binance/common/data.py +++ b/nautilus_trader/adapters/binance/common/data.py @@ -475,9 +475,25 @@ async def _unsubscribe_bars(self, bar_type: BarType) -> None: # -- REQUESTS --------------------------------------------------------------------------------- - async def _request_instrument(self, instrument_id: InstrumentId, correlation_id: UUID4) -> None: + async def _request_instrument( + self, + instrument_id: InstrumentId, + correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, + ) -> None: assert self._instrument_provider is not None # type checking + if start is not None: + self._log.warning( + f"Requesting instrument {instrument_id} with specified `start` which has no effect.", + ) + + if end is not None: + self._log.warning( + f"Requesting instrument {instrument_id} with specified `end` which has no effect.", + ) + instrument: Instrument | None = self._instrument_provider.find(instrument_id) if instrument is None: self._log.error(f"Cannot find instrument for {instrument_id}.") diff --git a/nautilus_trader/adapters/bybit/data.py b/nautilus_trader/adapters/bybit/data.py index cb98e63cd6db..2032a3ce9710 100644 --- a/nautilus_trader/adapters/bybit/data.py +++ b/nautilus_trader/adapters/bybit/data.py @@ -250,7 +250,23 @@ def _get_cached_instrument_id(self, symbol: str) -> InstrumentId: nautilus_instrument_id: InstrumentId = bybit_symbol.parse_as_nautilus() return nautilus_instrument_id - async def _request_instrument(self, instrument_id: InstrumentId, correlation_id: UUID4) -> None: + async def _request_instrument( + self, + instrument_id: InstrumentId, + correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, + ) -> None: + if start is not None: + self._log.warning( + f"Requesting instrument {instrument_id} with specified `start` which has no effect.", + ) + + if end is not None: + self._log.warning( + f"Requesting instrument {instrument_id} with specified `end` which has no effect.", + ) + instrument: Instrument | None = self._instrument_provider.find(instrument_id) if instrument is None: self._log.error(f"Cannot find instrument for {instrument_id}.") @@ -265,7 +281,23 @@ async def _request_instrument(self, instrument_id: InstrumentId, correlation_id: correlation_id=correlation_id, ) - async def _request_instruments(self, venue: Venue, correlation_id: UUID4) -> None: + async def _request_instruments( + self, + venue: Venue, + correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, + ) -> None: + if start is not None: + self._log.warning( + f"Requesting instruments for {venue} with specified `start` which has no effect.", + ) + + if end is not None: + self._log.warning( + f"Requesting instruments for {venue} with specified `end` which has no effect.", + ) + all_instruments = self._instrument_provider.get_all() target_instruments = [] for instrument in all_instruments.values(): diff --git a/nautilus_trader/adapters/databento/data.py b/nautilus_trader/adapters/databento/data.py index d4636a7b5a89..fc3ee3344410 100644 --- a/nautilus_trader/adapters/databento/data.py +++ b/nautilus_trader/adapters/databento/data.py @@ -168,8 +168,6 @@ async def _disconnect(self) -> None: async def _buffer_mbo_subscriptions(self) -> None: try: - self._log.info("Buffering MBO/L3 subscriptions...") - await asyncio.sleep(self._mbo_subscriptions_delay or 0.0) self._is_buffering_mbo_subscriptions = False @@ -222,6 +220,18 @@ async def _ensure_subscribed_for_instrument(self, instrument_id: InstrumentId) - self._instrument_ids[dataset].add(instrument_id) await self._subscribe_instrument(instrument_id) + async def _get_dataset_range(self, dataset: Dataset) -> tuple[pd.Timestamp, pd.Timestamp]: + response = await self._loop.run_in_executor( + None, + self._http_client.metadata.get_dataset_range, + dataset, + ) + + start = pd.Timestamp(response["start_date"], tz=pytz.utc) + end = pd.Timestamp(response["end_date"], tz=pytz.utc) + + return start, end + def _load_instrument_ids(self, dataset: Dataset, instrument_ids: list[InstrumentId]) -> None: instrument_ids_to_decode = set(instrument_ids) @@ -495,18 +505,29 @@ async def _request_instrument( self, instrument_id: InstrumentId, correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, ) -> None: dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) + + _, available_end = await self._get_dataset_range(dataset) + date_now_utc = self._clock.utc_now().date() - start = last_weekday_nanos( - year=date_now_utc.year, - month=date_now_utc.month, - day=date_now_utc.day, + default_start = pd.Timestamp( + last_weekday_nanos( + year=date_now_utc.year, + month=date_now_utc.month, + day=date_now_utc.day, + ), + tz=pytz.utc, ) + default_start = min(default_start, available_end - ONE_DAY) + data = await self._http_client.timeseries.get_range_async( dataset=dataset, - start=start, + start=start or default_start.date().isoformat(), + end=end, symbols=instrument_id.symbol.value, schema=databento.Schema.DEFINITION, ) @@ -526,9 +547,13 @@ async def _request_instruments( self, venue: Venue, correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, ) -> None: dataset: Dataset = self._loader.get_dataset_for_venue(venue) + _, available_end = await self._get_dataset_range(dataset) + date_now_utc = self._clock.utc_now().date() default_start = pd.Timestamp( last_weekday_nanos( @@ -539,12 +564,12 @@ async def _request_instruments( tz=pytz.utc, ) - if is_within_last_24_hours(default_start.value): - default_start -= ONE_DAY + default_start = min(default_start, available_end - ONE_DAY) data = await self._http_client.timeseries.get_range_async( dataset=dataset, - start=default_start.date().isoformat(), + start=start or default_start.date().isoformat(), + end=end, symbols=ALL_SYMBOLS, schema=databento.Schema.DEFINITION, ) diff --git a/nautilus_trader/adapters/interactive_brokers/data.py b/nautilus_trader/adapters/interactive_brokers/data.py index 671bf7ea8f7d..3acccbe780d5 100644 --- a/nautilus_trader/adapters/interactive_brokers/data.py +++ b/nautilus_trader/adapters/interactive_brokers/data.py @@ -283,7 +283,23 @@ async def _request(self, data_type: DataType, correlation_id: UUID4) -> None: "implement the `_request` coroutine", # pragma: no cover ) - async def _request_instrument(self, instrument_id: InstrumentId, correlation_id: UUID4): + async def _request_instrument( + self, + instrument_id: InstrumentId, + correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, + ): + if start is not None: + self._log.warning( + f"Requesting instrument {instrument_id} with specified `start` which has no effect.", + ) + + if end is not None: + self._log.warning( + f"Requesting instrument {instrument_id} with specified `end` which has no effect.", + ) + await self.instrument_provider.load_async(instrument_id) if instrument := self.instrument_provider.find(instrument_id): self._handle_data(instrument) @@ -292,7 +308,13 @@ async def _request_instrument(self, instrument_id: InstrumentId, correlation_id: return self._handle_instrument(instrument, correlation_id) - async def _request_instruments(self, venue: Venue, correlation_id: UUID4): + async def _request_instruments( + self, + venue: Venue, + correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, + ): raise NotImplementedError( # pragma: no cover "implement the `_request_instruments` coroutine", # pragma: no cover ) diff --git a/nautilus_trader/adapters/interactive_brokers/execution.py b/nautilus_trader/adapters/interactive_brokers/execution.py index 0a19be01079e..1df4ac28839d 100644 --- a/nautilus_trader/adapters/interactive_brokers/execution.py +++ b/nautilus_trader/adapters/interactive_brokers/execution.py @@ -339,9 +339,9 @@ async def generate_order_status_reports( instrument_id : InstrumentId, optional The instrument ID query filter. start : pd.Timestamp, optional - The start datetime query filter. + The start datetime (UTC) query filter. end : pd.Timestamp, optional - The end datetime query filter. + The end datetime (UTC) query filter. open_only : bool, default False If the query is for open orders only. @@ -417,9 +417,9 @@ async def generate_trade_reports( venue_order_id : VenueOrderId, optional The venue order ID (assigned by the venue) query filter. start : pd.Timestamp, optional - The start datetime query filter. + The start datetime (UTC) query filter. end : pd.Timestamp, optional - The end datetime query filter. + The end datetime (UTC) query filter. Returns ------- @@ -445,9 +445,9 @@ async def generate_position_status_reports( instrument_id : InstrumentId, optional The instrument ID query filter. start : pd.Timestamp, optional - The start datetime query filter. + The start datetime (UTC) query filter. end : pd.Timestamp, optional - The end datetime query filter. + The end datetime (UTC) query filter. Returns ------- diff --git a/nautilus_trader/backtest/data_client.pyx b/nautilus_trader/backtest/data_client.pyx index 923f4e98e808..acf1c7f4adf8 100644 --- a/nautilus_trader/backtest/data_client.pyx +++ b/nautilus_trader/backtest/data_client.pyx @@ -350,7 +350,13 @@ cdef class BacktestMarketDataClient(MarketDataClient): # -- REQUESTS ------------------------------------------------------------------------------------- - cpdef void request_instrument(self, InstrumentId instrument_id, UUID4 correlation_id): + cpdef void request_instrument( + self, + InstrumentId instrument_id, + UUID4 correlation_id, + datetime start: Optional[datetime] = None, + datetime end: Optional[datetime] = None, + ): Condition.not_none(instrument_id, "instrument_id") Condition.not_none(correlation_id, "correlation_id") @@ -369,7 +375,13 @@ cdef class BacktestMarketDataClient(MarketDataClient): correlation_id=correlation_id, ) - cpdef void request_instruments(self, Venue venue, UUID4 correlation_id): + cpdef void request_instruments( + self, + Venue venue, + UUID4 correlation_id, + datetime start: Optional[datetime] = None, + datetime end: Optional[datetime] = None, + ): Condition.not_none(correlation_id, "correlation_id") cdef list instruments = self._cache.instruments(venue) diff --git a/nautilus_trader/backtest/engine.pyx b/nautilus_trader/backtest/engine.pyx index abd23cc68554..96ff5c57c671 100644 --- a/nautilus_trader/backtest/engine.pyx +++ b/nautilus_trader/backtest/engine.pyx @@ -884,11 +884,11 @@ cdef class BacktestEngine: Parameters ---------- start : Union[datetime, str, int], optional - The start datetime (UTC) for the backtest run. If ``None`` engine runs - from the start of the data. + The start datetime (UTC) for the backtest run. + If ``None`` engine runs from the start of the data. end : Union[datetime, str, int], optional - The end datetime (UTC) for the backtest run. If ``None`` engine runs - to the end of the data. + The end datetime (UTC) for the backtest run. + If ``None`` engine runs to the end of the data. run_config_id : str, optional The tokenized `BacktestRunConfig` ID. streaming : bool, default False diff --git a/nautilus_trader/common/actor.pxd b/nautilus_trader/common/actor.pxd index edcf4fecd166..ba1ba3ecb27d 100644 --- a/nautilus_trader/common/actor.pxd +++ b/nautilus_trader/common/actor.pxd @@ -176,9 +176,28 @@ cdef class Actor(Component): # -- REQUESTS ------------------------------------------------------------------------------------- - cpdef UUID4 request_data(self, DataType data_type, ClientId client_id, callback=*) - cpdef UUID4 request_instrument(self, InstrumentId instrument_id, ClientId client_id=*, callback=*) - cpdef UUID4 request_instruments(self, Venue venue, ClientId client_id=*, callback=*) + cpdef UUID4 request_data( + self, + DataType data_type, + ClientId client_id, + callback=*, + ) + cpdef UUID4 request_instrument( + self, + InstrumentId instrument_id, + datetime start=*, + datetime end=*, + ClientId client_id=*, + callback=*, + ) + cpdef UUID4 request_instruments( + self, + Venue venue, + datetime start=*, + datetime end=*, + ClientId client_id=*, + callback=*, + ) cpdef UUID4 request_quote_ticks( self, InstrumentId instrument_id, diff --git a/nautilus_trader/common/actor.pyx b/nautilus_trader/common/actor.pyx index e7de5527a752..2bb9d877ff9d 100644 --- a/nautilus_trader/common/actor.pyx +++ b/nautilus_trader/common/actor.pyx @@ -2014,9 +2014,9 @@ cdef class Actor(Component): If `callback` is not `None` and not of type `Callable`. """ + Condition.true(self.trader_id is not None, "The actor has not been registered") Condition.not_none(client_id, "client_id") Condition.not_none(data_type, "data_type") - Condition.true(self.trader_id is not None, "The actor has not been registered") Condition.callable_or_none(callback, "callback") cdef UUID4 request_id = UUID4() @@ -2037,16 +2037,25 @@ cdef class Actor(Component): cpdef UUID4 request_instrument( self, InstrumentId instrument_id, + datetime start = None, + datetime end = None, ClientId client_id = None, callback: Callable[[UUID4], None] | None = None, ): """ Request `Instrument` data for the given instrument ID. + If `end` is ``None`` then will request up to the most recent data. + Parameters ---------- instrument_id : InstrumentId The instrument ID for the request. + start : datetime, optional + The start datetime (UTC) of request time range (inclusive). + end : datetime, optional + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. @@ -2061,11 +2070,16 @@ cdef class Actor(Component): Raises ------ + ValueError + If `start` and `end` are not `None` and `start` is >= `end`. TypeError If `callback` is not `None` and not of type `Callable`. """ + Condition.true(self.trader_id is not None, "The actor has not been registered") Condition.not_none(instrument_id, "instrument_id") + if start is not None and end is not None: + Condition.true(start < end, "start was >= end") Condition.callable_or_none(callback, "callback") cdef UUID4 request_id = UUID4() @@ -2074,6 +2088,8 @@ cdef class Actor(Component): venue=instrument_id.venue, data_type=DataType(Instrument, metadata={ "instrument_id": instrument_id, + "start": start, + "end": end, }), callback=self._handle_instrument_response, request_id=request_id, @@ -2088,16 +2104,25 @@ cdef class Actor(Component): cpdef UUID4 request_instruments( self, Venue venue, + datetime start = None, + datetime end = None, ClientId client_id = None, callback: Callable[[UUID4], None] | None = None, ): """ Request all `Instrument` data for the given venue. + If `end` is ``None`` then will request up to the most recent data. + Parameters ---------- venue : Venue The venue for the request. + start : datetime, optional + The start datetime (UTC) of request time range (inclusive). + end : datetime, optional + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. @@ -2112,11 +2137,16 @@ cdef class Actor(Component): Raises ------ + ValueError + If `start` and `end` are not `None` and `start` is >= `end`. TypeError If `callback` is not `None` and not of type `Callable`. """ + Condition.true(self.trader_id is not None, "The actor has not been registered") Condition.not_none(venue, "venue") + if start is not None and end is not None: + Condition.true(start < end, "start was >= end") Condition.callable_or_none(callback, "callback") cdef UUID4 request_id = UUID4() @@ -2125,6 +2155,8 @@ cdef class Actor(Component): venue=venue, data_type=DataType(Instrument, metadata={ "venue": venue, + "start": start, + "end": end, }), callback=self._handle_instruments_response, request_id=request_id, @@ -2156,8 +2188,8 @@ cdef class Actor(Component): start : datetime, optional The start datetime (UTC) of request time range (inclusive). end : datetime, optional - The end datetime (UTC) of request time range (inclusive). - If ``None`` then will default to the current datetime (UTC). + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. @@ -2173,15 +2205,15 @@ cdef class Actor(Component): Raises ------ ValueError - If `start` is not less than `end`. + If `start` and `end` are not `None` and `start` is >= `end`. TypeError If `callback` is not `None` and not of type `Callable`. """ + Condition.true(self.trader_id is not None, "The actor has not been registered") Condition.not_none(instrument_id, "instrument_id") if start is not None and end is not None: Condition.true(start < end, "start was >= end") - Condition.true(self.trader_id is not None, "The actor has not been registered") Condition.callable_or_none(callback, "callback") cdef UUID4 request_id = UUID4() @@ -2223,8 +2255,8 @@ cdef class Actor(Component): start : datetime, optional The start datetime (UTC) of request time range (inclusive). end : datetime, optional - The end datetime (UTC) of request time range (inclusive). - If ``None`` then will default to the current datetime (UTC). + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. @@ -2240,15 +2272,15 @@ cdef class Actor(Component): Raises ------ ValueError - If `start` is not less than `end`. + If `start` and `end` are not `None` and `start` is >= `end`. TypeError If `callback` is not `None` and not of type `Callable`. """ + Condition.true(self.trader_id is not None, "The actor has not been registered") Condition.not_none(instrument_id, "instrument_id") if start is not None and end is not None: Condition.true(start < end, "start was >= end") - Condition.true(self.trader_id is not None, "The actor has not been registered") Condition.callable_or_none(callback, "callback") cdef UUID4 request_id = UUID4() @@ -2290,8 +2322,8 @@ cdef class Actor(Component): start : datetime, optional The start datetime (UTC) of request time range (inclusive). end : datetime, optional - The end datetime (UTC) of request time range (inclusive). - If ``None`` then will default to the current datetime (UTC). + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. @@ -2307,15 +2339,15 @@ cdef class Actor(Component): Raises ------ ValueError - If `start` is not less than `end`. + If `start` and `end` are not `None` and `start` is >= `end`. TypeError If `callback` is not `None` and not of type `Callable`. """ + Condition.true(self.trader_id is not None, "The actor has not been registered") Condition.not_none(bar_type, "bar_type") if start is not None and end is not None: Condition.true(start < end, "start was >= end") - Condition.true(self.trader_id is not None, "The actor has not been registered") Condition.callable_or_none(callback, "callback") cdef UUID4 request_id = UUID4() diff --git a/nautilus_trader/data/client.pxd b/nautilus_trader/data/client.pxd index 3094f689b7b0..76a9edb0bac9 100644 --- a/nautilus_trader/data/client.pxd +++ b/nautilus_trader/data/client.pxd @@ -132,8 +132,20 @@ cdef class MarketDataClient(DataClient): # -- REQUEST HANDLERS ----------------------------------------------------------------------------- - cpdef void request_instrument(self, InstrumentId instrument_id, UUID4 correlation_id) - cpdef void request_instruments(self, Venue venue, UUID4 correlation_id) + cpdef void request_instrument( + self, + InstrumentId instrument_id, + UUID4 correlation_id, + datetime start=*, + datetime end=*, + ) + cpdef void request_instruments( + self, + Venue venue, + UUID4 correlation_id, + datetime start=*, + datetime end=*, + ) cpdef void request_quote_ticks( self, InstrumentId instrument_id, diff --git a/nautilus_trader/data/client.pyx b/nautilus_trader/data/client.pyx index 3a221fa16ee7..59348e7e20a3 100644 --- a/nautilus_trader/data/client.pyx +++ b/nautilus_trader/data/client.pyx @@ -879,7 +879,13 @@ cdef class MarketDataClient(DataClient): # -- REQUESTS ------------------------------------------------------------------------------------- - cpdef void request_instrument(self, InstrumentId instrument_id, UUID4 correlation_id): + cpdef void request_instrument( + self, + InstrumentId instrument_id, + UUID4 correlation_id, + datetime start = None, + datetime end = None, + ): """ Request `Instrument` data for the given instrument ID. @@ -889,6 +895,11 @@ cdef class MarketDataClient(DataClient): The instrument ID for the request. correlation_id : UUID4 The correlation ID for the request. + start : datetime, optional + The start datetime (UTC) of request time range (inclusive). + end : datetime, optional + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. """ self._log.error( # pragma: no cover @@ -896,7 +907,13 @@ cdef class MarketDataClient(DataClient): f"You can implement by overriding the `request_instrument` method for this client.", # pragma: no cover # noqa ) - cpdef void request_instruments(self, Venue venue, UUID4 correlation_id): + cpdef void request_instruments( + self, + Venue venue, + UUID4 correlation_id, + datetime start = None, + datetime end = None, + ): """ Request all `Instrument` data for the given venue. @@ -906,6 +923,11 @@ cdef class MarketDataClient(DataClient): The venue for the request. correlation_id : UUID4 The correlation ID for the request. + start : datetime, optional + The start datetime (UTC) of request time range (inclusive). + end : datetime, optional + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. """ self._log.error( # pragma: no cover @@ -933,10 +955,10 @@ cdef class MarketDataClient(DataClient): correlation_id : UUID4 The correlation ID for the request. start : datetime, optional - The specified from datetime for the data. + The start datetime (UTC) of request time range (inclusive). end : datetime, optional - The specified to datetime for the data. If ``None`` then will default - to the current datetime. + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. """ self._log.error( # pragma: no cover @@ -964,10 +986,10 @@ cdef class MarketDataClient(DataClient): correlation_id : UUID4 The correlation ID for the request. start : datetime, optional - The specified from datetime for the data. + The start datetime (UTC) of request time range (inclusive). end : datetime, optional - The specified to datetime for the data. If ``None`` then will default - to the current datetime. + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. """ self._log.error( # pragma: no cover @@ -995,10 +1017,10 @@ cdef class MarketDataClient(DataClient): correlation_id : UUID4 The correlation ID for the request. start : datetime, optional - The specified from datetime for the data. + The start datetime (UTC) of request time range (inclusive). end : datetime, optional - The specified to datetime for the data. If ``None`` then will default - to the current datetime. + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. """ self._log.error( # pragma: no cover diff --git a/nautilus_trader/data/engine.pyx b/nautilus_trader/data/engine.pyx index ac9c714fe6d1..eb8821b62f09 100644 --- a/nautilus_trader/data/engine.pyx +++ b/nautilus_trader/data/engine.pyx @@ -1245,9 +1245,19 @@ cdef class DataEngine(Component): Condition.true(isinstance(client, MarketDataClient), "client was not a MarketDataClient") instrument_id = request.data_type.metadata.get("instrument_id") if instrument_id is None: - client.request_instruments(request.data_type.metadata.get("venue"), request.id) + client.request_instruments( + request.data_type.metadata.get("venue"), + request.id, + request.data_type.metadata.get("start"), + request.data_type.metadata.get("end"), + ) else: - client.request_instrument(instrument_id, request.id) + client.request_instrument( + instrument_id, + request.id, + request.data_type.metadata.get("start"), + request.data_type.metadata.get("end"), + ) elif request.data_type.type == QuoteTick: Condition.true(isinstance(client, MarketDataClient), "client was not a MarketDataClient") client.request_quote_ticks( diff --git a/nautilus_trader/live/data_client.py b/nautilus_trader/live/data_client.py index 32d41aef486c..b80e737619b2 100644 --- a/nautilus_trader/live/data_client.py +++ b/nautilus_trader/live/data_client.py @@ -619,21 +619,46 @@ def unsubscribe_instrument_close(self, instrument_id: InstrumentId) -> None: # -- REQUESTS --------------------------------------------------------------------------------- def request(self, data_type: DataType, correlation_id: UUID4) -> None: + self._log.debug(f"Request data {data_type}.") self.create_task( self._request(data_type, correlation_id), log_msg=f"request: {data_type}", ) - def request_instrument(self, instrument_id: InstrumentId, correlation_id: UUID4) -> None: + def request_instrument( + self, + instrument_id: InstrumentId, + correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, + ) -> None: + self._log.debug(f"Request instrument {instrument_id}.") self.create_task( - self._request_instrument(instrument_id, correlation_id), + self._request_instrument( + instrument_id=instrument_id, + correlation_id=correlation_id, + start=start, + end=end, + ), log_msg=f"request: instrument {instrument_id}", ) - def request_instruments(self, venue: Venue, correlation_id: UUID4) -> None: + def request_instruments( + self, + venue: Venue, + correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, + ) -> None: self._log.debug(f"Request instruments for {venue} {correlation_id}.") self.create_task( - self._request_instruments(venue, correlation_id), + self._request_instruments( + venue=venue, + correlation_id=correlation_id, + start=start, + end=end, + ), + log_msg=f"request: instruments for {venue}", ) def request_quote_ticks( @@ -653,6 +678,7 @@ def request_quote_ticks( start=start, end=end, ), + log_msg=f"request: quote ticks {instrument_id}", ) def request_trade_ticks( @@ -672,6 +698,7 @@ def request_trade_ticks( start=start, end=end, ), + log_msg=f"request: trade ticks {instrument_id}", ) def request_bars( @@ -691,6 +718,7 @@ def request_bars( start=start, end=end, ), + log_msg=f"request: bars {bar_type}", ) ############################################################################ @@ -833,12 +861,24 @@ async def _request(self, data_type: DataType, correlation_id: UUID4) -> None: "implement the `_request` coroutine", # pragma: no cover ) - async def _request_instrument(self, instrument_id: InstrumentId, correlation_id: UUID4) -> None: + async def _request_instrument( + self, + instrument_id: InstrumentId, + correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_request_instrument` coroutine", # pragma: no cover ) - async def _request_instruments(self, venue: Venue, correlation_id: UUID4) -> None: + async def _request_instruments( + self, + venue: Venue, + correlation_id: UUID4, + start: pd.Timestamp | None = None, + end: pd.Timestamp | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_request_instruments` coroutine", # pragma: no cover ) diff --git a/nautilus_trader/live/execution_client.py b/nautilus_trader/live/execution_client.py index d606276be5b8..0df7597d628b 100644 --- a/nautilus_trader/live/execution_client.py +++ b/nautilus_trader/live/execution_client.py @@ -334,9 +334,9 @@ async def generate_order_status_reports( instrument_id : InstrumentId, optional The instrument ID query filter. start : pd.Timestamp, optional - The start datetime query filter. + The start datetime (UTC) query filter. end : pd.Timestamp, optional - The end datetime query filter. + The end datetime (UTC) query filter. open_only : bool, default False If the query is for open orders only. @@ -368,9 +368,9 @@ async def generate_trade_reports( venue_order_id : VenueOrderId, optional The venue order ID (assigned by the venue) query filter. start : pd.Timestamp, optional - The start datetime query filter. + The start datetime (UTC) query filter. end : pd.Timestamp, optional - The end datetime query filter. + The end datetime (UTC) query filter. Returns ------- @@ -397,9 +397,9 @@ async def generate_position_status_reports( instrument_id : InstrumentId, optional The instrument ID query filter. start : pd.Timestamp, optional - The start datetime query filter. + The start datetime (UTC) query filter. end : pd.Timestamp, optional - The end datetime query filter. + The end datetime (UTC) query filter. Returns -------