Skip to content

Commit

Permalink
Add Binance initial internal aggregation feature
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Oct 16, 2023
1 parent 37e6d8c commit 4f1727f
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 40 deletions.
148 changes: 109 additions & 39 deletions nautilus_trader/adapters/binance/common/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,18 @@
from nautilus_trader.core.correctness import PyCondition
from nautilus_trader.core.datetime import secs_to_millis
from nautilus_trader.core.uuid import UUID4
from nautilus_trader.data.aggregation import TickBarAggregator
from nautilus_trader.data.aggregation import ValueBarAggregator
from nautilus_trader.data.aggregation import VolumeBarAggregator
from nautilus_trader.live.data_client import LiveMarketDataClient
from nautilus_trader.model.data import Bar
from nautilus_trader.model.data import BarType
from nautilus_trader.model.data import DataType
from nautilus_trader.model.data import OrderBookDelta
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import BarAggregation
from nautilus_trader.model.enums import BookType
from nautilus_trader.model.enums import PriceType
from nautilus_trader.model.identifiers import ClientId
Expand Down Expand Up @@ -386,7 +391,7 @@ async def _subscribe_bars(self, bar_type: BarType) -> None:
)
return

resolution = self._enum_parser.parse_internal_bar_agg(bar_type.spec.aggregation)
resolution = self._enum_parser.parse_nautilus_bar_aggregation(bar_type.spec.aggregation)
if self._binance_account_type.is_futures and resolution == "s":
self._log.error(
f"Cannot subscribe to {bar_type}. ",
Expand Down Expand Up @@ -437,7 +442,7 @@ async def _unsubscribe_bars(self, bar_type: BarType) -> None:
)
return

resolution = self._enum_parser.parse_internal_bar_agg(bar_type.spec.aggregation)
resolution = self._enum_parser.parse_nautilus_bar_aggregation(bar_type.spec.aggregation)
if self._binance_account_type.is_futures and resolution == "s":
self._log.error(
f"Cannot unsubscribe from {bar_type}. ",
Expand Down Expand Up @@ -536,44 +541,16 @@ async def _request_bars( # (too complex)
start: Optional[pd.Timestamp] = None,
end: Optional[pd.Timestamp] = None,
) -> None:
if limit == 0 or limit > 1000:
limit = 1000

if bar_type.is_internally_aggregated():
self._log.error(
f"Cannot request {bar_type}: "
f"only historical bars with EXTERNAL aggregation available from Binance.",
)
return

if not bar_type.spec.is_time_aggregated():
self._log.error(
f"Cannot request {bar_type}: only time bars are aggregated by Binance.",
)
return

resolution = self._enum_parser.parse_internal_bar_agg(bar_type.spec.aggregation)
if not self._binance_account_type.is_spot_or_margin and resolution == "s":
self._log.error(
f"Cannot request {bar_type}: ",
"second interval bars are not aggregated by Binance Futures.",
)
try:
interval = BinanceKlineInterval(f"{bar_type.spec.step}{resolution}")
except ValueError:
self._log.error(
f"Cannot create Binance Kline interval. {bar_type.spec.step}{resolution} "
"not supported.",
)
return

if bar_type.spec.price_type != PriceType.LAST:
self._log.error(
f"Cannot request {bar_type}: "
f"only historical bars for LAST price type available from Binance.",
)
return

if limit == 0 or limit > 1000:
limit = 1000

start_time_ms = None
if start is not None:
start_time_ms = secs_to_millis(start.timestamp())
Expand All @@ -582,17 +559,110 @@ async def _request_bars( # (too complex)
if end is not None:
end_time_ms = secs_to_millis(end.timestamp())

bars = await self._http_market.request_binance_bars(
bar_type=bar_type,
interval=interval,
if bar_type.is_externally_aggregated():
if not bar_type.spec.is_time_aggregated():
self._log.error(
f"Cannot request {bar_type}: only time bars are aggregated by Binance.",
)
return

resolution = self._enum_parser.parse_nautilus_bar_aggregation(bar_type.spec.aggregation)
if not self._binance_account_type.is_spot_or_margin and resolution == "s":
self._log.error(
f"Cannot request {bar_type}: ",
"second interval bars are not aggregated by Binance Futures.",
)
try:
interval = BinanceKlineInterval(f"{bar_type.spec.step}{resolution}")
except ValueError:
self._log.error(
f"Cannot create Binance Kline interval. {bar_type.spec.step}{resolution} "
"not supported.",
)
return
bars = await self._http_market.request_binance_bars(
bar_type=bar_type,
interval=interval,
start_time=start_time_ms,
end_time=end_time_ms,
limit=limit,
ts_init=self._clock.timestamp_ns(),
)
else:
bars = await self._aggregate_internal_from_agg_trade_ticks(
bar_type=bar_type,
start_time_ms=start_time_ms,
end_time_ms=end_time_ms,
limit=limit,
)

partial: Bar = bars.pop()
self._handle_bars(bar_type, bars, partial, correlation_id)

async def _aggregate_internal_from_agg_trade_ticks(
self,
bar_type: BarType,
start_time_ms: Optional[int],
end_time_ms: Optional[int],
limit: Optional[int],
) -> list[Bar]:
instrument = self._instrument_provider.find(bar_type.instrument_id)
if instrument is None:
self._log.error(
f"Cannot aggregate internal bars: instrument {bar_type.instrument_id} not found.",
)
return []

ticks = await self._http_market.request_agg_trade_ticks(
instrument_id=instrument.id,
start_time=start_time_ms,
end_time=end_time_ms,
limit=limit,
ts_init=self._clock.timestamp_ns(),
)

partial: BinanceBar = bars.pop()
self._handle_bars(bar_type, bars, partial, correlation_id)
bars: list[Bar] = []
if bar_type.spec.is_time_aggregated():
self._log.error("Internally aggregating historical time bars not yet supported.")
return bars
elif bar_type.spec.aggregation == BarAggregation.TICK:
aggregator = TickBarAggregator(
instrument=instrument,
bar_type=bar_type,
handler=bars.append,
logger=self._log.get_logger(),
)
elif bar_type.spec.aggregation == BarAggregation.VOLUME:
aggregator = VolumeBarAggregator(
instrument=instrument,
bar_type=bar_type,
handler=bars.append,
logger=self._log.get_logger(),
)
elif bar_type.spec.aggregation == BarAggregation.VALUE:
aggregator = ValueBarAggregator(
instrument=instrument,
bar_type=bar_type,
handler=bars.append,
logger=self._log.get_logger(),
)
else:
raise RuntimeError( # pragma: no cover (design-time error)
f"Cannot start aggregator: " # pragma: no cover (design-time error)
f"BarAggregation.{bar_type.spec.aggregation_string_c()} " # pragma: no cover (design-time error)
f"not supported in open-source", # pragma: no cover (design-time error)
)

for tick in ticks:
aggregator.handle_trade_tick(tick)

self._log.info(
f"Internally aggregated {len(ticks)} external trade ticks to {len(bars)} {bar_type} bars.",
LogColor.BLUE,
)

if limit:
bars = bars[:limit]
return bars

def _send_all_instruments_to_data_engine(self) -> None:
for instrument in self._instrument_provider.get_all().values():
Expand Down
2 changes: 1 addition & 1 deletion nautilus_trader/adapters/binance/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ def parse_binance_bar_agg(self, bar_agg: str) -> BarAggregation:
f"unrecognized Binance kline resolution, was {bar_agg}",
)

def parse_internal_bar_agg(self, bar_agg: BarAggregation) -> str:
def parse_nautilus_bar_aggregation(self, bar_agg: BarAggregation) -> str:
try:
return self.int_to_ext_bar_agg[bar_agg]
except KeyError:
Expand Down

0 comments on commit 4f1727f

Please sign in to comment.