From a05c0bcc7861a0ab3eb324c303688c71e93f17f6 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Mon, 25 Mar 2024 19:36:46 +1100 Subject: [PATCH] Implement Bybit quote ticks --- nautilus_trader/adapters/bybit/data.py | 111 ++++++++++++------ nautilus_trader/adapters/bybit/execution.py | 5 +- nautilus_trader/adapters/bybit/schemas/ws.py | 45 ++++++- .../adapters/bybit/websocket/client.py | 20 ++++ 4 files changed, 138 insertions(+), 43 deletions(-) diff --git a/nautilus_trader/adapters/bybit/data.py b/nautilus_trader/adapters/bybit/data.py index d72202926298..4f1147d37f2c 100644 --- a/nautilus_trader/adapters/bybit/data.py +++ b/nautilus_trader/adapters/bybit/data.py @@ -37,6 +37,7 @@ from nautilus_trader.common.component import LiveClock from nautilus_trader.common.component import MessageBus from nautilus_trader.common.enums import LogColor +from nautilus_trader.core.datetime import millis_to_nanos from nautilus_trader.core.datetime import secs_to_millis from nautilus_trader.core.message import Request from nautilus_trader.core.nautilus_pyo3 import Symbol @@ -47,12 +48,15 @@ from nautilus_trader.model.data import BarType from nautilus_trader.model.data import CustomData from nautilus_trader.model.data import DataType +from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick from nautilus_trader.model.enums import PriceType from nautilus_trader.model.identifiers import ClientId from nautilus_trader.model.identifiers import InstrumentId from nautilus_trader.model.identifiers import Venue from nautilus_trader.model.instruments import Instrument +from nautilus_trader.model.objects import Price +from nautilus_trader.model.objects import Quantity class BybitDataClient(LiveMarketDataClient): @@ -108,6 +112,7 @@ def __init__( # Hot cache self._instrument_ids: dict[str, InstrumentId] = {} + self._last_quotes: dict[InstrumentId, QuoteTick] = {} # HTTP API self._http_market = BybitMarketHttpAPI( @@ -235,6 +240,20 @@ async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: await ws_client.subscribe_trades(bybit_symbol.raw_symbol) self._log.info(f"Subscribed {instrument_id} trade ticks.", LogColor.BLUE) + async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + bybit_symbol = BybitSymbol(instrument_id.symbol.value) + assert bybit_symbol # type checking + ws_client = self._ws_clients[bybit_symbol.instrument_type] + await ws_client.unsubscribe_tickers(bybit_symbol.raw_symbol) + self._log.info(f"Unsubscribed {instrument_id} quote ticks.", LogColor.BLUE) + + async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + bybit_symbol = BybitSymbol(instrument_id.symbol.value) + assert bybit_symbol # type checking + ws_client = self._ws_clients[bybit_symbol.instrument_type] + await ws_client.unsubscribe_trades(bybit_symbol.raw_symbol) + self._log.info(f"Unsubscribed {instrument_id} trade ticks.", LogColor.BLUE) + def _handle_ws_message(self, instrument_type: BybitInstrumentType, raw: bytes) -> None: try: ws_message = self._decoder_ws_msg_general.decode(raw) @@ -247,36 +266,6 @@ def _handle_ws_message(self, instrument_type: BybitInstrumentType, raw: bytes) - except Exception as e: self._log.error(f"Failed to parse ticker: {raw.decode()} with error {e}") - def _handle_ticker(self, instrument_type: BybitInstrumentType, raw: bytes) -> None: - try: - msg = self._decoders["ticker"].decode(raw) - self._log.warning(f"{msg}") - - for quote_tick in msg.data: - symbol = quote_tick.s + f"-{instrument_type.value.upper()}" - instrument_id: InstrumentId = self._get_cached_instrument_id(symbol) - quote_tick = quote_tick.parse_to_quote_tick( - instrument_id, - self._clock.timestamp_ns(), - ) - self._handle_data(quote_tick) - except Exception as e: - self._log.error(f"Failed to parse ticker: {raw.decode()} with error {e}") - - def _handle_trade(self, instrument_type: BybitInstrumentType, raw: bytes) -> None: - try: - msg = self._decoders["trade"].decode(raw) - for trade in msg.data: - symbol = trade.s + f"-{instrument_type.value.upper()}" - instrument_id: InstrumentId = self._get_cached_instrument_id(symbol) - trade_tick: TradeTick = trade.parse_to_trade_tick( - instrument_id, - self._clock.timestamp_ns(), - ) - self._handle_data(trade_tick) - except Exception as e: - self._log.error(f"Failed to parse trade tick: {raw.decode()} with error {e}") - def _topic_check(self, instrument_type: BybitInstrumentType, topic: str, raw: bytes) -> None: if "publicTrade" in topic: self._handle_trade(instrument_type, raw) @@ -292,6 +281,63 @@ def _get_cached_instrument_id(self, symbol: str) -> InstrumentId: nautilus_instrument_id: InstrumentId = bybit_symbol.parse_as_nautilus() return nautilus_instrument_id + def _handle_ticker(self, instrument_type: BybitInstrumentType, raw: bytes) -> None: + msg = self._decoders["ticker"].decode(raw) + try: + symbol = msg.data.symbol + f"-{instrument_type.value.upper()}" + instrument_id: InstrumentId = self._get_cached_instrument_id(symbol) + last_quote = self._last_quotes.get(instrument_id) + + quote = QuoteTick( + instrument_id=instrument_id, + bid_price=( + Price.from_str(msg.data.bid1Price) + if msg.data.bid1Price or last_quote is None + else last_quote.bid_price + ), + ask_price=( + Price.from_str(msg.data.ask1Price) + if msg.data.ask1Price or last_quote is None + else last_quote.ask_price + ), + bid_size=( + Quantity.from_str(msg.data.bid1Size) + if msg.data.bid1Size or last_quote is None + else last_quote.bid_size + ), + ask_size=( + Quantity.from_str(msg.data.ask1Size) + if msg.data.ask1Size or last_quote is None + else last_quote.ask_size + ), + ts_event=millis_to_nanos(msg.ts), + ts_init=self._clock.timestamp_ns(), + ) + + self._last_quotes[quote.instrument_id] = quote + self._handle_data(quote) + except Exception as e: + self._log.error(f"Failed to parse ticker: {msg} with error {e}") + + def _handle_trade(self, instrument_type: BybitInstrumentType, raw: bytes) -> None: + msg = self._decoders["trade"].decode(raw) + try: + for data in msg.data: + symbol = data.s + f"-{instrument_type.value.upper()}" + instrument_id: InstrumentId = self._get_cached_instrument_id(symbol) + trade: TradeTick = data.parse_to_trade_tick( + instrument_id, + self._clock.timestamp_ns(), + ) + self._handle_data(trade) + except Exception as e: + self._log.error(f"Failed to parse trade tick: {msg} with error {e}") + + async def _request(self, data_type: DataType, correlation_id: UUID4) -> None: + if data_type.type == BybitTickerData: + symbol = data_type.metadata["symbol"] + await self._handle_ticker_data_request(symbol, correlation_id) + async def _request_instrument( self, instrument_id: InstrumentId, @@ -438,8 +484,3 @@ async def _handle_ticker_data_request(self, symbol: Symbol, correlation_id: UUID result, correlation_id, ) - - async def _request(self, data_type: DataType, correlation_id: UUID4) -> None: - if data_type.type == BybitTickerData: - symbol = data_type.metadata["symbol"] - await self._handle_ticker_data_request(symbol, correlation_id) diff --git a/nautilus_trader/adapters/bybit/execution.py b/nautilus_trader/adapters/bybit/execution.py index efb2c46dcc57..b684f2cd452b 100644 --- a/nautilus_trader/adapters/bybit/execution.py +++ b/nautilus_trader/adapters/bybit/execution.py @@ -54,6 +54,7 @@ from nautilus_trader.model.enums import OmsType from nautilus_trader.model.enums import OrderStatus from nautilus_trader.model.enums import OrderType +from nautilus_trader.model.enums import account_type_to_str from nautilus_trader.model.identifiers import AccountId from nautilus_trader.model.identifiers import ClientId from nautilus_trader.model.identifiers import ClientOrderId @@ -123,7 +124,7 @@ def __init__( # Configuration self._use_position_ids = config.use_position_ids - self._log.info(f"Account type: ${self.account_type}", LogColor.BLUE) + self._log.info(f"Account type: {account_type_to_str(self.account_type)}", LogColor.BLUE) self._instrument_types = instrument_types self._enum_parser = BybitEnumParser() @@ -423,7 +424,7 @@ def _handle_ws_message(self, raw: bytes) -> None: except Exception as e: ws_message_sub = self._decoder_ws_subscription.decode(raw) if ws_message_sub.success: - self._log.info(f"Subscribed to stream {ws_message.topic}", LogColor.BLUE) + self._log.debug("Subscribed to stream") else: self._log.error(f"Failed to subscribe. {e!s}") diff --git a/nautilus_trader/adapters/bybit/schemas/ws.py b/nautilus_trader/adapters/bybit/schemas/ws.py index 05f15ba87f49..20834f8ef57b 100644 --- a/nautilus_trader/adapters/bybit/schemas/ws.py +++ b/nautilus_trader/adapters/bybit/schemas/ws.py @@ -26,6 +26,7 @@ from nautilus_trader.core.datetime import millis_to_nanos from nautilus_trader.core.uuid import UUID4 from nautilus_trader.execution.reports import OrderStatusReport +from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick from nautilus_trader.model.enums import AggressorSide from nautilus_trader.model.identifiers import AccountId @@ -146,7 +147,7 @@ class BybitWsOrderbookSnapshotMsg(msgspec.Struct): class BybitWsTickerLinear(msgspec.Struct, omit_defaults=True, kw_only=True): symbol: str tickDirection: str | None = None - price24hPcnt: str + price24hPcnt: str | None = None lastPrice: str | None = None prevPrice24h: str | None = None highPrice24h: str | None = None @@ -159,11 +160,27 @@ class BybitWsTickerLinear(msgspec.Struct, omit_defaults=True, kw_only=True): turnover24h: str | None = None volume24h: str | None = None nextFundingTime: str | None = None - fundingRate: str - bid1Price: str - bid1Size: str - ask1Price: str - ask1Size: str + fundingRate: str | None = None + bid1Price: str | None = None + bid1Size: str | None = None + ask1Price: str | None = None + ask1Size: str | None = None + + def parse_to_quote_tick( + self, + instrument_id: InstrumentId, + ts_event: int, + ts_init: int, + ) -> QuoteTick: + return QuoteTick( + instrument_id=instrument_id, + bid_price=Price.from_str(self.bid1Price), + ask_price=Price.from_str(self.ask1Price), + bid_size=Quantity.from_str(self.bid1Size), + ask_size=Quantity.from_str(self.ask1Size), + ts_event=ts_event, + ts_init=ts_init, + ) class BybitWsTickerLinearMsg(msgspec.Struct): @@ -231,6 +248,22 @@ class BybitWsTickerOption(msgspec.Struct): predictedDeliveryPrice: str change24h: str + def parse_to_quote_tick( + self, + instrument_id: InstrumentId, + ts_event: int, + ts_init: int, + ) -> QuoteTick: + return QuoteTick( + instrument_id=instrument_id, + bid_price=Price.from_str(self.bidPrice), + ask_price=Price.from_str(self.askPrice), + bid_size=Quantity.from_str(self.bidSize), + ask_size=Quantity.from_str(self.askSize), + ts_event=ts_event, + ts_init=ts_init, + ) + class BybitWsTickerOptionMsg(msgspec.Struct): topic: str diff --git a/nautilus_trader/adapters/bybit/websocket/client.py b/nautilus_trader/adapters/bybit/websocket/client.py index 3484bd99087a..057dba6de6b3 100644 --- a/nautilus_trader/adapters/bybit/websocket/client.py +++ b/nautilus_trader/adapters/bybit/websocket/client.py @@ -94,6 +94,26 @@ async def subscribe_tickers(self, symbol: str) -> None: await self._client.send_text(json.dumps(sub)) self._subscriptions.append(subscription) + async def unsubscribe_trades(self, symbol: str) -> None: + if self._client is None: + self._log.warning("Cannot subscribe: not connected") + return + + subscription = f"publicTrade.{symbol}" + sub = {"op": "unsubscribe", "args": [subscription]} + await self._client.send_text(json.dumps(sub)) + self._subscriptions.remove(subscription) + + async def unsubscribe_tickers(self, symbol: str) -> None: + if self._client is None: + self._log.warning("Cannot subscribe: not connected") + return + + subscription = f"tickers.{symbol}" + sub = {"op": "unsubscribe", "args": [subscription]} + await self._client.send_text(json.dumps(sub)) + self._subscriptions.remove(subscription) + ################################################################################ # Private ################################################################################