Skip to content

Commit

Permalink
Implement Bybit quote ticks
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Mar 25, 2024
1 parent 8c63527 commit a05c0bc
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 43 deletions.
111 changes: 76 additions & 35 deletions nautilus_trader/adapters/bybit/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from nautilus_trader.common.component import LiveClock
from nautilus_trader.common.component import MessageBus
from nautilus_trader.common.enums import LogColor
from nautilus_trader.core.datetime import millis_to_nanos
from nautilus_trader.core.datetime import secs_to_millis
from nautilus_trader.core.message import Request
from nautilus_trader.core.nautilus_pyo3 import Symbol
Expand All @@ -47,12 +48,15 @@
from nautilus_trader.model.data import BarType
from nautilus_trader.model.data import CustomData
from nautilus_trader.model.data import DataType
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import PriceType
from nautilus_trader.model.identifiers import ClientId
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.identifiers import Venue
from nautilus_trader.model.instruments import Instrument
from nautilus_trader.model.objects import Price
from nautilus_trader.model.objects import Quantity


class BybitDataClient(LiveMarketDataClient):
Expand Down Expand Up @@ -108,6 +112,7 @@ def __init__(

# Hot cache
self._instrument_ids: dict[str, InstrumentId] = {}
self._last_quotes: dict[InstrumentId, QuoteTick] = {}

# HTTP API
self._http_market = BybitMarketHttpAPI(
Expand Down Expand Up @@ -235,6 +240,20 @@ async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
await ws_client.subscribe_trades(bybit_symbol.raw_symbol)
self._log.info(f"Subscribed {instrument_id} trade ticks.", LogColor.BLUE)

async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
bybit_symbol = BybitSymbol(instrument_id.symbol.value)
assert bybit_symbol # type checking
ws_client = self._ws_clients[bybit_symbol.instrument_type]
await ws_client.unsubscribe_tickers(bybit_symbol.raw_symbol)
self._log.info(f"Unsubscribed {instrument_id} quote ticks.", LogColor.BLUE)

async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
bybit_symbol = BybitSymbol(instrument_id.symbol.value)
assert bybit_symbol # type checking
ws_client = self._ws_clients[bybit_symbol.instrument_type]
await ws_client.unsubscribe_trades(bybit_symbol.raw_symbol)
self._log.info(f"Unsubscribed {instrument_id} trade ticks.", LogColor.BLUE)

def _handle_ws_message(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
try:
ws_message = self._decoder_ws_msg_general.decode(raw)
Expand All @@ -247,36 +266,6 @@ def _handle_ws_message(self, instrument_type: BybitInstrumentType, raw: bytes) -
except Exception as e:
self._log.error(f"Failed to parse ticker: {raw.decode()} with error {e}")

def _handle_ticker(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
try:
msg = self._decoders["ticker"].decode(raw)
self._log.warning(f"{msg}")

for quote_tick in msg.data:
symbol = quote_tick.s + f"-{instrument_type.value.upper()}"
instrument_id: InstrumentId = self._get_cached_instrument_id(symbol)
quote_tick = quote_tick.parse_to_quote_tick(
instrument_id,
self._clock.timestamp_ns(),
)
self._handle_data(quote_tick)
except Exception as e:
self._log.error(f"Failed to parse ticker: {raw.decode()} with error {e}")

def _handle_trade(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
try:
msg = self._decoders["trade"].decode(raw)
for trade in msg.data:
symbol = trade.s + f"-{instrument_type.value.upper()}"
instrument_id: InstrumentId = self._get_cached_instrument_id(symbol)
trade_tick: TradeTick = trade.parse_to_trade_tick(
instrument_id,
self._clock.timestamp_ns(),
)
self._handle_data(trade_tick)
except Exception as e:
self._log.error(f"Failed to parse trade tick: {raw.decode()} with error {e}")

def _topic_check(self, instrument_type: BybitInstrumentType, topic: str, raw: bytes) -> None:
if "publicTrade" in topic:
self._handle_trade(instrument_type, raw)
Expand All @@ -292,6 +281,63 @@ def _get_cached_instrument_id(self, symbol: str) -> InstrumentId:
nautilus_instrument_id: InstrumentId = bybit_symbol.parse_as_nautilus()
return nautilus_instrument_id

def _handle_ticker(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
msg = self._decoders["ticker"].decode(raw)
try:
symbol = msg.data.symbol + f"-{instrument_type.value.upper()}"
instrument_id: InstrumentId = self._get_cached_instrument_id(symbol)
last_quote = self._last_quotes.get(instrument_id)

quote = QuoteTick(
instrument_id=instrument_id,
bid_price=(
Price.from_str(msg.data.bid1Price)
if msg.data.bid1Price or last_quote is None
else last_quote.bid_price
),
ask_price=(
Price.from_str(msg.data.ask1Price)
if msg.data.ask1Price or last_quote is None
else last_quote.ask_price
),
bid_size=(
Quantity.from_str(msg.data.bid1Size)
if msg.data.bid1Size or last_quote is None
else last_quote.bid_size
),
ask_size=(
Quantity.from_str(msg.data.ask1Size)
if msg.data.ask1Size or last_quote is None
else last_quote.ask_size
),
ts_event=millis_to_nanos(msg.ts),
ts_init=self._clock.timestamp_ns(),
)

self._last_quotes[quote.instrument_id] = quote
self._handle_data(quote)
except Exception as e:
self._log.error(f"Failed to parse ticker: {msg} with error {e}")

def _handle_trade(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
msg = self._decoders["trade"].decode(raw)
try:
for data in msg.data:
symbol = data.s + f"-{instrument_type.value.upper()}"
instrument_id: InstrumentId = self._get_cached_instrument_id(symbol)
trade: TradeTick = data.parse_to_trade_tick(
instrument_id,
self._clock.timestamp_ns(),
)
self._handle_data(trade)
except Exception as e:
self._log.error(f"Failed to parse trade tick: {msg} with error {e}")

async def _request(self, data_type: DataType, correlation_id: UUID4) -> None:
if data_type.type == BybitTickerData:
symbol = data_type.metadata["symbol"]
await self._handle_ticker_data_request(symbol, correlation_id)

async def _request_instrument(
self,
instrument_id: InstrumentId,
Expand Down Expand Up @@ -438,8 +484,3 @@ async def _handle_ticker_data_request(self, symbol: Symbol, correlation_id: UUID
result,
correlation_id,
)

async def _request(self, data_type: DataType, correlation_id: UUID4) -> None:
if data_type.type == BybitTickerData:
symbol = data_type.metadata["symbol"]
await self._handle_ticker_data_request(symbol, correlation_id)
5 changes: 3 additions & 2 deletions nautilus_trader/adapters/bybit/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from nautilus_trader.model.enums import OmsType
from nautilus_trader.model.enums import OrderStatus
from nautilus_trader.model.enums import OrderType
from nautilus_trader.model.enums import account_type_to_str
from nautilus_trader.model.identifiers import AccountId
from nautilus_trader.model.identifiers import ClientId
from nautilus_trader.model.identifiers import ClientOrderId
Expand Down Expand Up @@ -123,7 +124,7 @@ def __init__(
# Configuration
self._use_position_ids = config.use_position_ids

self._log.info(f"Account type: ${self.account_type}", LogColor.BLUE)
self._log.info(f"Account type: {account_type_to_str(self.account_type)}", LogColor.BLUE)
self._instrument_types = instrument_types
self._enum_parser = BybitEnumParser()

Expand Down Expand Up @@ -423,7 +424,7 @@ def _handle_ws_message(self, raw: bytes) -> None:
except Exception as e:
ws_message_sub = self._decoder_ws_subscription.decode(raw)
if ws_message_sub.success:
self._log.info(f"Subscribed to stream {ws_message.topic}", LogColor.BLUE)
self._log.debug("Subscribed to stream")
else:
self._log.error(f"Failed to subscribe. {e!s}")

Expand Down
45 changes: 39 additions & 6 deletions nautilus_trader/adapters/bybit/schemas/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from nautilus_trader.core.datetime import millis_to_nanos
from nautilus_trader.core.uuid import UUID4
from nautilus_trader.execution.reports import OrderStatusReport
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import AggressorSide
from nautilus_trader.model.identifiers import AccountId
Expand Down Expand Up @@ -146,7 +147,7 @@ class BybitWsOrderbookSnapshotMsg(msgspec.Struct):
class BybitWsTickerLinear(msgspec.Struct, omit_defaults=True, kw_only=True):
symbol: str
tickDirection: str | None = None
price24hPcnt: str
price24hPcnt: str | None = None
lastPrice: str | None = None
prevPrice24h: str | None = None
highPrice24h: str | None = None
Expand All @@ -159,11 +160,27 @@ class BybitWsTickerLinear(msgspec.Struct, omit_defaults=True, kw_only=True):
turnover24h: str | None = None
volume24h: str | None = None
nextFundingTime: str | None = None
fundingRate: str
bid1Price: str
bid1Size: str
ask1Price: str
ask1Size: str
fundingRate: str | None = None
bid1Price: str | None = None
bid1Size: str | None = None
ask1Price: str | None = None
ask1Size: str | None = None

def parse_to_quote_tick(
self,
instrument_id: InstrumentId,
ts_event: int,
ts_init: int,
) -> QuoteTick:
return QuoteTick(
instrument_id=instrument_id,
bid_price=Price.from_str(self.bid1Price),
ask_price=Price.from_str(self.ask1Price),
bid_size=Quantity.from_str(self.bid1Size),
ask_size=Quantity.from_str(self.ask1Size),
ts_event=ts_event,
ts_init=ts_init,
)


class BybitWsTickerLinearMsg(msgspec.Struct):
Expand Down Expand Up @@ -231,6 +248,22 @@ class BybitWsTickerOption(msgspec.Struct):
predictedDeliveryPrice: str
change24h: str

def parse_to_quote_tick(
self,
instrument_id: InstrumentId,
ts_event: int,
ts_init: int,
) -> QuoteTick:
return QuoteTick(
instrument_id=instrument_id,
bid_price=Price.from_str(self.bidPrice),
ask_price=Price.from_str(self.askPrice),
bid_size=Quantity.from_str(self.bidSize),
ask_size=Quantity.from_str(self.askSize),
ts_event=ts_event,
ts_init=ts_init,
)


class BybitWsTickerOptionMsg(msgspec.Struct):
topic: str
Expand Down
20 changes: 20 additions & 0 deletions nautilus_trader/adapters/bybit/websocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,26 @@ async def subscribe_tickers(self, symbol: str) -> None:
await self._client.send_text(json.dumps(sub))
self._subscriptions.append(subscription)

async def unsubscribe_trades(self, symbol: str) -> None:
if self._client is None:
self._log.warning("Cannot subscribe: not connected")
return

subscription = f"publicTrade.{symbol}"
sub = {"op": "unsubscribe", "args": [subscription]}
await self._client.send_text(json.dumps(sub))
self._subscriptions.remove(subscription)

async def unsubscribe_tickers(self, symbol: str) -> None:
if self._client is None:
self._log.warning("Cannot subscribe: not connected")
return

subscription = f"tickers.{symbol}"
sub = {"op": "unsubscribe", "args": [subscription]}
await self._client.send_text(json.dumps(sub))
self._subscriptions.remove(subscription)

################################################################################
# Private
################################################################################
Expand Down

0 comments on commit a05c0bc

Please sign in to comment.