Skip to content

Commit

Permalink
Improve Binance internal bar aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Oct 19, 2023
1 parent a0c6b38 commit dcea0cf
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 7 deletions.
2 changes: 1 addition & 1 deletion RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This will be the final release with support for Python 3.9.
- Added `Cache.is_order_pending_cancel_local(...)` (tracks local orders in cancel transition)
- Added `BinanceTimeInForce.GTD` enum member (futures only)
- Added Binance Futures support for GTD orders
- Added Binance internal bar aggregation inference from aggregated trade ticks
- Added Binance internal bar aggregation inference from aggregated trade ticks or 1-MINUTE bars (depending on lookback window)
- Added `BinanceExecClientConfig.use_gtd` option (to remap to GTC and locally manage GTD orders)
- Added package version check for `nautilus_ibapi`, thanks @rsmb7z
- Added `RiskEngine` min/max instrument notional limit checks
Expand Down
173 changes: 167 additions & 6 deletions nautilus_trader/adapters/binance/common/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# -------------------------------------------------------------------------------------------------

import asyncio
import decimal
from decimal import Decimal
from typing import Optional, Union

import msgspec
Expand Down Expand Up @@ -46,24 +48,30 @@
from nautilus_trader.core.correctness import PyCondition
from nautilus_trader.core.datetime import secs_to_millis
from nautilus_trader.core.uuid import UUID4
from nautilus_trader.data.aggregation import BarAggregator
from nautilus_trader.data.aggregation import TickBarAggregator
from nautilus_trader.data.aggregation import ValueBarAggregator
from nautilus_trader.data.aggregation import VolumeBarAggregator
from nautilus_trader.live.data_client import LiveMarketDataClient
from nautilus_trader.model.data import Bar
from nautilus_trader.model.data import BarSpecification
from nautilus_trader.model.data import BarType
from nautilus_trader.model.data import DataType
from nautilus_trader.model.data import OrderBookDelta
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import AggregationSource
from nautilus_trader.model.enums import AggressorSide
from nautilus_trader.model.enums import BarAggregation
from nautilus_trader.model.enums import BookType
from nautilus_trader.model.enums import PriceType
from nautilus_trader.model.identifiers import ClientId
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.identifiers import Symbol
from nautilus_trader.model.identifiers import TradeId
from nautilus_trader.model.instruments import Instrument
from nautilus_trader.model.objects import Quantity
from nautilus_trader.msgbus.bus import MessageBus


Expand Down Expand Up @@ -594,16 +602,169 @@ async def _request_bars( # (too complex)
LogColor.BLUE,
)
else:
bars = await self._aggregate_internal_from_agg_trade_ticks(
bar_type=bar_type,
start_time_ms=start_time_ms,
end_time_ms=end_time_ms,
limit=limit if limit > 0 else None,
)
if start and start < self._clock.utc_now() - pd.Timedelta(days=1):
bars = await self._aggregate_internal_from_minute_bars(
bar_type=bar_type,
start_time_ms=start_time_ms,
end_time_ms=end_time_ms,
limit=limit if limit > 0 else None,
)
else:
bars = await self._aggregate_internal_from_agg_trade_ticks(
bar_type=bar_type,
start_time_ms=start_time_ms,
end_time_ms=end_time_ms,
limit=limit if limit > 0 else None,
)

partial: Bar = bars.pop()
self._handle_bars(bar_type, bars, partial, correlation_id)

async def _aggregate_internal_from_minute_bars(
self,
bar_type: BarType,
start_time_ms: Optional[int],
end_time_ms: Optional[int],
limit: Optional[int],
) -> list[Bar]:
instrument = self._instrument_provider.find(bar_type.instrument_id)
if instrument is None:
self._log.error(
f"Cannot aggregate internal bars: instrument {bar_type.instrument_id} not found.",
)
return []

self._log.info("Requesting 1-MINUTE Binance bars to infer INTERNAL bars...", LogColor.BLUE)

binance_bars = await self._http_market.request_binance_bars(
bar_type=BarType(
bar_type.instrument_id,
BarSpecification(1, BarAggregation.MINUTE, PriceType.LAST),
AggregationSource.EXTERNAL,
),
interval=BinanceKlineInterval.MINUTE_1,
start_time=start_time_ms,
end_time=end_time_ms,
ts_init=self._clock.timestamp_ns(),
limit=limit,
)

quantize_value = Decimal(f"1e-{instrument.size_precision}")

bars: list[Bar] = []
if bar_type.spec.aggregation == BarAggregation.TICK:
aggregator = TickBarAggregator(
instrument=instrument,
bar_type=bar_type,
handler=bars.append,
logger=self._log.get_logger(),
)
elif bar_type.spec.aggregation == BarAggregation.VOLUME:
aggregator = VolumeBarAggregator(
instrument=instrument,
bar_type=bar_type,
handler=bars.append,
logger=self._log.get_logger(),
)
elif bar_type.spec.aggregation == BarAggregation.VALUE:
aggregator = ValueBarAggregator(
instrument=instrument,
bar_type=bar_type,
handler=bars.append,
logger=self._log.get_logger(),
)
else:
raise RuntimeError( # pragma: no cover (design-time error)
f"Cannot start aggregator: " # pragma: no cover (design-time error)
f"BarAggregation.{bar_type.spec.aggregation_string_c()} " # pragma: no cover (design-time error)
f"not supported in open-source", # pragma: no cover (design-time error)
)

for binance_bar in binance_bars:
if binance_bar.count == 0:
continue
self._aggregate_bar_to_trade_ticks(
instrument=instrument,
aggregator=aggregator,
binance_bar=binance_bar,
quantize_value=quantize_value,
)

self._log.info(
f"Inferred {len(bars)} {bar_type} bars aggregated from {len(binance_bars)} 1-MINUTE Binance bars.",
LogColor.BLUE,
)

if limit:
bars = bars[:limit]
return bars

def _aggregate_bar_to_trade_ticks(
self,
instrument: Instrument,
aggregator: BarAggregator,
binance_bar: BinanceBar,
quantize_value: Decimal,
) -> None:
volume = binance_bar.volume.as_decimal()
size_part: Decimal = (volume / (4 * binance_bar.count)).quantize(
quantize_value,
rounding=decimal.ROUND_DOWN,
)
remainder: Decimal = volume - (size_part * 4 * binance_bar.count)

size = Quantity(size_part, instrument.size_precision)

for i in range(binance_bar.count):
open = TradeTick(
instrument_id=instrument.id,
price=binance_bar.open,
size=size,
aggressor_side=AggressorSide.NO_AGGRESSOR,
trade_id=TradeId("NULL"), # N/A
ts_event=binance_bar.ts_event,
ts_init=binance_bar.ts_event,
)

high = TradeTick(
instrument_id=instrument.id,
price=binance_bar.high,
size=size,
aggressor_side=AggressorSide.NO_AGGRESSOR,
trade_id=TradeId("NULL"), # N/A
ts_event=binance_bar.ts_event,
ts_init=binance_bar.ts_event,
)

low = TradeTick(
instrument_id=instrument.id,
price=binance_bar.low,
size=size,
aggressor_side=AggressorSide.NO_AGGRESSOR,
trade_id=TradeId("NULL"), # N/A
ts_event=binance_bar.ts_event,
ts_init=binance_bar.ts_event,
)

close_size = size
if i == binance_bar.count - 1:
close_size = Quantity(size_part + remainder, instrument.size_precision)

close = TradeTick(
instrument_id=instrument.id,
price=binance_bar.close,
size=close_size,
aggressor_side=AggressorSide.NO_AGGRESSOR,
trade_id=TradeId("NULL"), # N/A
ts_event=binance_bar.ts_event,
ts_init=binance_bar.ts_event,
)

aggregator.handle_trade_tick(open)
aggregator.handle_trade_tick(high)
aggregator.handle_trade_tick(low)
aggregator.handle_trade_tick(close)

async def _aggregate_internal_from_agg_trade_ticks(
self,
bar_type: BarType,
Expand Down

0 comments on commit dcea0cf

Please sign in to comment.