Skip to content

Commit

Permalink
Add subscribe_bars await_partial option
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Oct 20, 2023
1 parent 58cab99 commit 0082d06
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 17 deletions.
2 changes: 1 addition & 1 deletion nautilus_trader/common/actor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ cdef class Actor(Component):
cpdef void subscribe_ticker(self, InstrumentId instrument_id, ClientId client_id=*)
cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id=*)
cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id=*)
cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id=*)
cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id=*, bint await_partial=*)
cpdef void subscribe_venue_status(self, Venue venue, ClientId client_id=*)
cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id=*)
cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, ClientId client_id=*)
Expand Down
17 changes: 15 additions & 2 deletions nautilus_trader/common/actor.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1420,7 +1420,12 @@ cdef class Actor(Component):

self._send_data_cmd(command)

cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id = None):
cpdef void subscribe_bars(
self,
BarType bar_type,
ClientId client_id = None,
bint await_partial = False,
):
"""
Subscribe to streaming `Bar` data for the given bar type.
Expand All @@ -1431,6 +1436,9 @@ cdef class Actor(Component):
client_id : ClientId, optional
The specific client ID for the command.
If ``None`` then will be inferred from the venue in the instrument ID.
await_partial : bool, default False
If the bar aggregator should await the arrival of a historical partial bar prior
to activaely aggregating new bars.
"""
Condition.not_none(bar_type, "bar_type")
Expand All @@ -1441,10 +1449,15 @@ cdef class Actor(Component):
handler=self.handle_bar,
)

cdef dict metadata = {
"bar_type": bar_type,
"await_partial": await_partial,
}

cdef Subscribe command = Subscribe(
client_id=client_id,
venue=bar_type.instrument_id.venue,
data_type=DataType(Bar, metadata={"bar_type": bar_type}),
data_type=DataType(Bar, metadata=metadata),
command_id=UUID4(),
ts_init=self._clock.timestamp_ns(),
)
Expand Down
1 change: 1 addition & 0 deletions nautilus_trader/data/aggregation.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ cdef class BarAggregator:
cdef LoggerAdapter _log
cdef BarBuilder _builder
cdef object _handler
cdef bint _await_partial

cdef readonly BarType bar_type
"""The aggregators bar type.\n\n:returns: `BarType`"""
Expand Down
29 changes: 19 additions & 10 deletions nautilus_trader/data/aggregation.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ cdef class BarAggregator:
The bar handler for the aggregator.
logger : Logger
The logger for the aggregator.
await_partial : bool, default False
If the aggregator should await an initial partial bar prior to aggregating.
Raises
------
Expand All @@ -250,11 +252,13 @@ cdef class BarAggregator:
BarType bar_type not None,
handler not None: Callable[[Bar], None],
Logger logger not None,
bint await_partial = False,
):
Condition.equal(instrument.id, bar_type.instrument_id, "instrument.id", "bar_type.instrument_id")

self.bar_type = bar_type
self._handler = handler
self._await_partial = await_partial
self._log = LoggerAdapter(
component_name=type(self).__name__,
logger=logger,
Expand All @@ -264,6 +268,9 @@ cdef class BarAggregator:
bar_type=self.bar_type,
)

def set_await_partial(self, bint value):
self._await_partial = value

cpdef void handle_quote_tick(self, QuoteTick tick):
"""
Update the aggregator with the given tick.
Expand All @@ -276,11 +283,12 @@ cdef class BarAggregator:
"""
Condition.not_none(tick, "tick")

self._apply_update(
price=tick.extract_price(self.bar_type.spec.price_type),
size=tick.extract_volume(self.bar_type.spec.price_type),
ts_event=tick.ts_event,
)
if not self._await_partial:
self._apply_update(
price=tick.extract_price(self.bar_type.spec.price_type),
size=tick.extract_volume(self.bar_type.spec.price_type),
ts_event=tick.ts_event,
)

cpdef void handle_trade_tick(self, TradeTick tick):
"""
Expand All @@ -294,11 +302,12 @@ cdef class BarAggregator:
"""
Condition.not_none(tick, "tick")

self._apply_update(
price=tick.price,
size=tick.size,
ts_event=tick.ts_event,
)
if not self._await_partial:
self._apply_update(
price=tick.price,
size=tick.size,
ts_event=tick.ts_event,
)

cpdef void set_partial(self, Bar partial_bar):
"""
Expand Down
4 changes: 2 additions & 2 deletions nautilus_trader/data/engine.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ cdef class DataEngine(Component):
cpdef void _handle_subscribe_synthetic_quote_ticks(self, InstrumentId instrument_id)
cpdef void _handle_subscribe_trade_ticks(self, MarketDataClient client, InstrumentId instrument_id)
cpdef void _handle_subscribe_synthetic_trade_ticks(self, InstrumentId instrument_id)
cpdef void _handle_subscribe_bars(self, MarketDataClient client, BarType bar_type)
cpdef void _handle_subscribe_bars(self, MarketDataClient client, BarType bar_type, bint await_partial)
cpdef void _handle_subscribe_data(self, DataClient client, DataType data_type)
cpdef void _handle_subscribe_venue_status(self, MarketDataClient client, Venue venue)
cpdef void _handle_subscribe_instrument_status(self, MarketDataClient client, InstrumentId instrument_id)
Expand Down Expand Up @@ -167,7 +167,7 @@ cdef class DataEngine(Component):
cpdef void _internal_update_instruments(self, list instruments)
cpdef void _update_order_book(self, Data data)
cpdef void _snapshot_order_book(self, TimeEvent snap_event)
cpdef void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type)
cpdef void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type, bint await_partial)
cpdef void _stop_bar_aggregator(self, MarketDataClient client, BarType bar_type)
cpdef void _update_synthetics_with_quote(self, list synthetics, QuoteTick update)
cpdef void _update_synthetic_with_quote(self, SyntheticInstrument synthetic, QuoteTick update)
Expand Down
16 changes: 14 additions & 2 deletions nautilus_trader/data/engine.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ cdef class DataEngine(Component):
self._handle_subscribe_bars(
client,
command.data_type.metadata.get("bar_type"),
command.data_type.metadata.get("await_partial"),
)
elif command.data_type.type == VenueStatus:
self._handle_subscribe_venue_status(
Expand Down Expand Up @@ -979,14 +980,15 @@ cdef class DataEngine(Component):
self,
MarketDataClient client,
BarType bar_type,
bint await_partial,
):
Condition.not_none(client, "client")
Condition.not_none(bar_type, "bar_type")

if bar_type.is_internally_aggregated():
# Internal aggregation
if bar_type not in self._bar_aggregators:
self._start_bar_aggregator(client, bar_type)
self._start_bar_aggregator(client, bar_type, await_partial)
else:
# External aggregation
if bar_type.instrument_id.is_synthetic():
Expand Down Expand Up @@ -1544,6 +1546,8 @@ cdef class DataEngine(Component):
if partial is not None and partial.bar_type.is_internally_aggregated():
# Update partial time bar
aggregator = self._bar_aggregators.get(partial.bar_type)
aggregator.set_await_partial(False)

if aggregator:
self._log.debug(f"Applying partial bar {partial} for {partial.bar_type}.")
aggregator.set_partial(partial)
Expand Down Expand Up @@ -1599,7 +1603,12 @@ cdef class DataEngine(Component):
f"no order book found, {snap_event}.",
)

cpdef void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type):
cpdef void _start_bar_aggregator(
self,
MarketDataClient client,
BarType bar_type,
bint await_partial,
):
cdef Instrument instrument = self._cache.instrument(bar_type.instrument_id)
if instrument is None:
self._log.error(
Expand Down Expand Up @@ -1646,6 +1655,9 @@ cdef class DataEngine(Component):
f"not supported in open-source" # pragma: no cover (design-time error)
)

# Set if awaiting initial partial bar
aggregator.set_await_partial(await_partial)

# Add aggregator
self._bar_aggregators[bar_type] = aggregator
self._log.debug(f"Added {aggregator} for {bar_type} bars.")
Expand Down

0 comments on commit 0082d06

Please sign in to comment.