From dcea0cfa7973693e9a9e161d9a62acc16d74ab2e Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Thu, 19 Oct 2023 19:20:50 +1100 Subject: [PATCH] Improve Binance internal bar aggregation --- RELEASES.md | 2 +- .../adapters/binance/common/data.py | 173 +++++++++++++++++- 2 files changed, 168 insertions(+), 7 deletions(-) diff --git a/RELEASES.md b/RELEASES.md index 7d4c3a435731..5c42c1e218ff 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -11,7 +11,7 @@ This will be the final release with support for Python 3.9. - Added `Cache.is_order_pending_cancel_local(...)` (tracks local orders in cancel transition) - Added `BinanceTimeInForce.GTD` enum member (futures only) - Added Binance Futures support for GTD orders -- Added Binance internal bar aggregation inference from aggregated trade ticks +- Added Binance internal bar aggregation inference from aggregated trade ticks or 1-MINUTE bars (depending on lookback window) - Added `BinanceExecClientConfig.use_gtd` option (to remap to GTC and locally manage GTD orders) - Added package version check for `nautilus_ibapi`, thanks @rsmb7z - Added `RiskEngine` min/max instrument notional limit checks diff --git a/nautilus_trader/adapters/binance/common/data.py b/nautilus_trader/adapters/binance/common/data.py index b246e69dc155..b2d49b2542dd 100644 --- a/nautilus_trader/adapters/binance/common/data.py +++ b/nautilus_trader/adapters/binance/common/data.py @@ -14,6 +14,8 @@ # ------------------------------------------------------------------------------------------------- import asyncio +import decimal +from decimal import Decimal from typing import Optional, Union import msgspec @@ -46,24 +48,30 @@ from nautilus_trader.core.correctness import PyCondition from nautilus_trader.core.datetime import secs_to_millis from nautilus_trader.core.uuid import UUID4 +from nautilus_trader.data.aggregation import BarAggregator from nautilus_trader.data.aggregation import TickBarAggregator from nautilus_trader.data.aggregation import ValueBarAggregator from nautilus_trader.data.aggregation import VolumeBarAggregator from nautilus_trader.live.data_client import LiveMarketDataClient from nautilus_trader.model.data import Bar +from nautilus_trader.model.data import BarSpecification from nautilus_trader.model.data import BarType from nautilus_trader.model.data import DataType from nautilus_trader.model.data import OrderBookDelta from nautilus_trader.model.data import OrderBookDeltas from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick +from nautilus_trader.model.enums import AggregationSource +from nautilus_trader.model.enums import AggressorSide from nautilus_trader.model.enums import BarAggregation from nautilus_trader.model.enums import BookType from nautilus_trader.model.enums import PriceType from nautilus_trader.model.identifiers import ClientId from nautilus_trader.model.identifiers import InstrumentId from nautilus_trader.model.identifiers import Symbol +from nautilus_trader.model.identifiers import TradeId from nautilus_trader.model.instruments import Instrument +from nautilus_trader.model.objects import Quantity from nautilus_trader.msgbus.bus import MessageBus @@ -594,16 +602,169 @@ async def _request_bars( # (too complex) LogColor.BLUE, ) else: - bars = await self._aggregate_internal_from_agg_trade_ticks( - bar_type=bar_type, - start_time_ms=start_time_ms, - end_time_ms=end_time_ms, - limit=limit if limit > 0 else None, - ) + if start and start < self._clock.utc_now() - pd.Timedelta(days=1): + bars = await self._aggregate_internal_from_minute_bars( + bar_type=bar_type, + start_time_ms=start_time_ms, + end_time_ms=end_time_ms, + limit=limit if limit > 0 else None, + ) + else: + bars = await self._aggregate_internal_from_agg_trade_ticks( + bar_type=bar_type, + start_time_ms=start_time_ms, + end_time_ms=end_time_ms, + limit=limit if limit > 0 else None, + ) partial: Bar = bars.pop() self._handle_bars(bar_type, bars, partial, correlation_id) + async def _aggregate_internal_from_minute_bars( + self, + bar_type: BarType, + start_time_ms: Optional[int], + end_time_ms: Optional[int], + limit: Optional[int], + ) -> list[Bar]: + instrument = self._instrument_provider.find(bar_type.instrument_id) + if instrument is None: + self._log.error( + f"Cannot aggregate internal bars: instrument {bar_type.instrument_id} not found.", + ) + return [] + + self._log.info("Requesting 1-MINUTE Binance bars to infer INTERNAL bars...", LogColor.BLUE) + + binance_bars = await self._http_market.request_binance_bars( + bar_type=BarType( + bar_type.instrument_id, + BarSpecification(1, BarAggregation.MINUTE, PriceType.LAST), + AggregationSource.EXTERNAL, + ), + interval=BinanceKlineInterval.MINUTE_1, + start_time=start_time_ms, + end_time=end_time_ms, + ts_init=self._clock.timestamp_ns(), + limit=limit, + ) + + quantize_value = Decimal(f"1e-{instrument.size_precision}") + + bars: list[Bar] = [] + if bar_type.spec.aggregation == BarAggregation.TICK: + aggregator = TickBarAggregator( + instrument=instrument, + bar_type=bar_type, + handler=bars.append, + logger=self._log.get_logger(), + ) + elif bar_type.spec.aggregation == BarAggregation.VOLUME: + aggregator = VolumeBarAggregator( + instrument=instrument, + bar_type=bar_type, + handler=bars.append, + logger=self._log.get_logger(), + ) + elif bar_type.spec.aggregation == BarAggregation.VALUE: + aggregator = ValueBarAggregator( + instrument=instrument, + bar_type=bar_type, + handler=bars.append, + logger=self._log.get_logger(), + ) + else: + raise RuntimeError( # pragma: no cover (design-time error) + f"Cannot start aggregator: " # pragma: no cover (design-time error) + f"BarAggregation.{bar_type.spec.aggregation_string_c()} " # pragma: no cover (design-time error) + f"not supported in open-source", # pragma: no cover (design-time error) + ) + + for binance_bar in binance_bars: + if binance_bar.count == 0: + continue + self._aggregate_bar_to_trade_ticks( + instrument=instrument, + aggregator=aggregator, + binance_bar=binance_bar, + quantize_value=quantize_value, + ) + + self._log.info( + f"Inferred {len(bars)} {bar_type} bars aggregated from {len(binance_bars)} 1-MINUTE Binance bars.", + LogColor.BLUE, + ) + + if limit: + bars = bars[:limit] + return bars + + def _aggregate_bar_to_trade_ticks( + self, + instrument: Instrument, + aggregator: BarAggregator, + binance_bar: BinanceBar, + quantize_value: Decimal, + ) -> None: + volume = binance_bar.volume.as_decimal() + size_part: Decimal = (volume / (4 * binance_bar.count)).quantize( + quantize_value, + rounding=decimal.ROUND_DOWN, + ) + remainder: Decimal = volume - (size_part * 4 * binance_bar.count) + + size = Quantity(size_part, instrument.size_precision) + + for i in range(binance_bar.count): + open = TradeTick( + instrument_id=instrument.id, + price=binance_bar.open, + size=size, + aggressor_side=AggressorSide.NO_AGGRESSOR, + trade_id=TradeId("NULL"), # N/A + ts_event=binance_bar.ts_event, + ts_init=binance_bar.ts_event, + ) + + high = TradeTick( + instrument_id=instrument.id, + price=binance_bar.high, + size=size, + aggressor_side=AggressorSide.NO_AGGRESSOR, + trade_id=TradeId("NULL"), # N/A + ts_event=binance_bar.ts_event, + ts_init=binance_bar.ts_event, + ) + + low = TradeTick( + instrument_id=instrument.id, + price=binance_bar.low, + size=size, + aggressor_side=AggressorSide.NO_AGGRESSOR, + trade_id=TradeId("NULL"), # N/A + ts_event=binance_bar.ts_event, + ts_init=binance_bar.ts_event, + ) + + close_size = size + if i == binance_bar.count - 1: + close_size = Quantity(size_part + remainder, instrument.size_precision) + + close = TradeTick( + instrument_id=instrument.id, + price=binance_bar.close, + size=close_size, + aggressor_side=AggressorSide.NO_AGGRESSOR, + trade_id=TradeId("NULL"), # N/A + ts_event=binance_bar.ts_event, + ts_init=binance_bar.ts_event, + ) + + aggregator.handle_trade_tick(open) + aggregator.handle_trade_tick(high) + aggregator.handle_trade_tick(low) + aggregator.handle_trade_tick(close) + async def _aggregate_internal_from_agg_trade_ticks( self, bar_type: BarType,