From 0082d061c796e4c8885f54dc1db53dd915bbbbcf Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Sat, 21 Oct 2023 00:08:59 +1100 Subject: [PATCH] Add subscribe_bars await_partial option --- nautilus_trader/common/actor.pxd | 2 +- nautilus_trader/common/actor.pyx | 17 ++++++++++++++-- nautilus_trader/data/aggregation.pxd | 1 + nautilus_trader/data/aggregation.pyx | 29 ++++++++++++++++++---------- nautilus_trader/data/engine.pxd | 4 ++-- nautilus_trader/data/engine.pyx | 16 +++++++++++++-- 6 files changed, 52 insertions(+), 17 deletions(-) diff --git a/nautilus_trader/common/actor.pxd b/nautilus_trader/common/actor.pxd index bec7e1afb3f2..dd058727b89e 100644 --- a/nautilus_trader/common/actor.pxd +++ b/nautilus_trader/common/actor.pxd @@ -156,7 +156,7 @@ cdef class Actor(Component): cpdef void subscribe_ticker(self, InstrumentId instrument_id, ClientId client_id=*) cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id=*) cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id=*) - cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id=*) + cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id=*, bint await_partial=*) cpdef void subscribe_venue_status(self, Venue venue, ClientId client_id=*) cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id=*) cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, ClientId client_id=*) diff --git a/nautilus_trader/common/actor.pyx b/nautilus_trader/common/actor.pyx index cb36c8725740..b4969fe5c179 100644 --- a/nautilus_trader/common/actor.pyx +++ b/nautilus_trader/common/actor.pyx @@ -1420,7 +1420,12 @@ cdef class Actor(Component): self._send_data_cmd(command) - cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id = None): + cpdef void subscribe_bars( + self, + BarType bar_type, + ClientId client_id = None, + bint await_partial = False, + ): """ Subscribe to streaming `Bar` data for the given bar type. @@ -1431,6 +1436,9 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + await_partial : bool, default False + If the bar aggregator should await the arrival of a historical partial bar prior + to activaely aggregating new bars. """ Condition.not_none(bar_type, "bar_type") @@ -1441,10 +1449,15 @@ cdef class Actor(Component): handler=self.handle_bar, ) + cdef dict metadata = { + "bar_type": bar_type, + "await_partial": await_partial, + } + cdef Subscribe command = Subscribe( client_id=client_id, venue=bar_type.instrument_id.venue, - data_type=DataType(Bar, metadata={"bar_type": bar_type}), + data_type=DataType(Bar, metadata=metadata), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) diff --git a/nautilus_trader/data/aggregation.pxd b/nautilus_trader/data/aggregation.pxd index fe43dda4d91f..4464ab183271 100644 --- a/nautilus_trader/data/aggregation.pxd +++ b/nautilus_trader/data/aggregation.pxd @@ -62,6 +62,7 @@ cdef class BarAggregator: cdef LoggerAdapter _log cdef BarBuilder _builder cdef object _handler + cdef bint _await_partial cdef readonly BarType bar_type """The aggregators bar type.\n\n:returns: `BarType`""" diff --git a/nautilus_trader/data/aggregation.pyx b/nautilus_trader/data/aggregation.pyx index c5a8ee7a6294..eef26fc37025 100644 --- a/nautilus_trader/data/aggregation.pyx +++ b/nautilus_trader/data/aggregation.pyx @@ -237,6 +237,8 @@ cdef class BarAggregator: The bar handler for the aggregator. logger : Logger The logger for the aggregator. + await_partial : bool, default False + If the aggregator should await an initial partial bar prior to aggregating. Raises ------ @@ -250,11 +252,13 @@ cdef class BarAggregator: BarType bar_type not None, handler not None: Callable[[Bar], None], Logger logger not None, + bint await_partial = False, ): Condition.equal(instrument.id, bar_type.instrument_id, "instrument.id", "bar_type.instrument_id") self.bar_type = bar_type self._handler = handler + self._await_partial = await_partial self._log = LoggerAdapter( component_name=type(self).__name__, logger=logger, @@ -264,6 +268,9 @@ cdef class BarAggregator: bar_type=self.bar_type, ) + def set_await_partial(self, bint value): + self._await_partial = value + cpdef void handle_quote_tick(self, QuoteTick tick): """ Update the aggregator with the given tick. @@ -276,11 +283,12 @@ cdef class BarAggregator: """ Condition.not_none(tick, "tick") - self._apply_update( - price=tick.extract_price(self.bar_type.spec.price_type), - size=tick.extract_volume(self.bar_type.spec.price_type), - ts_event=tick.ts_event, - ) + if not self._await_partial: + self._apply_update( + price=tick.extract_price(self.bar_type.spec.price_type), + size=tick.extract_volume(self.bar_type.spec.price_type), + ts_event=tick.ts_event, + ) cpdef void handle_trade_tick(self, TradeTick tick): """ @@ -294,11 +302,12 @@ cdef class BarAggregator: """ Condition.not_none(tick, "tick") - self._apply_update( - price=tick.price, - size=tick.size, - ts_event=tick.ts_event, - ) + if not self._await_partial: + self._apply_update( + price=tick.price, + size=tick.size, + ts_event=tick.ts_event, + ) cpdef void set_partial(self, Bar partial_bar): """ diff --git a/nautilus_trader/data/engine.pxd b/nautilus_trader/data/engine.pxd index bdeaec7f51ee..87bba28901e5 100644 --- a/nautilus_trader/data/engine.pxd +++ b/nautilus_trader/data/engine.pxd @@ -123,7 +123,7 @@ cdef class DataEngine(Component): cpdef void _handle_subscribe_synthetic_quote_ticks(self, InstrumentId instrument_id) cpdef void _handle_subscribe_trade_ticks(self, MarketDataClient client, InstrumentId instrument_id) cpdef void _handle_subscribe_synthetic_trade_ticks(self, InstrumentId instrument_id) - cpdef void _handle_subscribe_bars(self, MarketDataClient client, BarType bar_type) + cpdef void _handle_subscribe_bars(self, MarketDataClient client, BarType bar_type, bint await_partial) cpdef void _handle_subscribe_data(self, DataClient client, DataType data_type) cpdef void _handle_subscribe_venue_status(self, MarketDataClient client, Venue venue) cpdef void _handle_subscribe_instrument_status(self, MarketDataClient client, InstrumentId instrument_id) @@ -167,7 +167,7 @@ cdef class DataEngine(Component): cpdef void _internal_update_instruments(self, list instruments) cpdef void _update_order_book(self, Data data) cpdef void _snapshot_order_book(self, TimeEvent snap_event) - cpdef void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type) + cpdef void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type, bint await_partial) cpdef void _stop_bar_aggregator(self, MarketDataClient client, BarType bar_type) cpdef void _update_synthetics_with_quote(self, list synthetics, QuoteTick update) cpdef void _update_synthetic_with_quote(self, SyntheticInstrument synthetic, QuoteTick update) diff --git a/nautilus_trader/data/engine.pyx b/nautilus_trader/data/engine.pyx index a0c24d6e31fa..c8f162e9babe 100644 --- a/nautilus_trader/data/engine.pyx +++ b/nautilus_trader/data/engine.pyx @@ -683,6 +683,7 @@ cdef class DataEngine(Component): self._handle_subscribe_bars( client, command.data_type.metadata.get("bar_type"), + command.data_type.metadata.get("await_partial"), ) elif command.data_type.type == VenueStatus: self._handle_subscribe_venue_status( @@ -979,6 +980,7 @@ cdef class DataEngine(Component): self, MarketDataClient client, BarType bar_type, + bint await_partial, ): Condition.not_none(client, "client") Condition.not_none(bar_type, "bar_type") @@ -986,7 +988,7 @@ cdef class DataEngine(Component): if bar_type.is_internally_aggregated(): # Internal aggregation if bar_type not in self._bar_aggregators: - self._start_bar_aggregator(client, bar_type) + self._start_bar_aggregator(client, bar_type, await_partial) else: # External aggregation if bar_type.instrument_id.is_synthetic(): @@ -1544,6 +1546,8 @@ cdef class DataEngine(Component): if partial is not None and partial.bar_type.is_internally_aggregated(): # Update partial time bar aggregator = self._bar_aggregators.get(partial.bar_type) + aggregator.set_await_partial(False) + if aggregator: self._log.debug(f"Applying partial bar {partial} for {partial.bar_type}.") aggregator.set_partial(partial) @@ -1599,7 +1603,12 @@ cdef class DataEngine(Component): f"no order book found, {snap_event}.", ) - cpdef void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type): + cpdef void _start_bar_aggregator( + self, + MarketDataClient client, + BarType bar_type, + bint await_partial, + ): cdef Instrument instrument = self._cache.instrument(bar_type.instrument_id) if instrument is None: self._log.error( @@ -1646,6 +1655,9 @@ cdef class DataEngine(Component): f"not supported in open-source" # pragma: no cover (design-time error) ) + # Set if awaiting initial partial bar + aggregator.set_await_partial(await_partial) + # Add aggregator self._bar_aggregators[bar_type] = aggregator self._log.debug(f"Added {aggregator} for {bar_type} bars.")