Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bybit: add trade spot and trade option websocket msgspec schemas #1570

Merged
merged 4 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions nautilus_trader/adapters/bybit/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# -------------------------------------------------------------------------------------------------

import asyncio
from collections import defaultdict
from functools import partial

import msgspec
import pandas as pd
Expand Down Expand Up @@ -131,22 +133,24 @@ def __init__(

# WebSocket API
self._ws_clients: dict[BybitInstrumentType, BybitWebsocketClient] = {}
self._decoders: dict[str, dict[BybitInstrumentType, msgspec.json.Decoder]] = defaultdict(
dict,
)
for instrument_type in instrument_types:
self._ws_clients[instrument_type] = BybitWebsocketClient(
clock=clock,
handler=lambda x: self._handle_ws_message(instrument_type, x),
handler=partial(self._handle_ws_message, instrument_type),
davidsblom marked this conversation as resolved.
Show resolved Hide resolved
base_url=ws_urls[instrument_type],
api_key=config.api_key or get_api_key(config.testnet),
api_secret=config.api_secret or get_api_secret(config.testnet),
)

# WebSocket decoders
self._decoders = {
"orderbook": decoder_ws_orderbook(),
"trade": decoder_ws_trade(),
"ticker": decoder_ws_ticker(instrument_type),
"kline": decoder_ws_kline(),
}
self._decoders["orderbook"][instrument_type] = decoder_ws_orderbook()
self._decoders["trade"][instrument_type] = decoder_ws_trade(instrument_type)
self._decoders["ticker"][instrument_type] = decoder_ws_ticker(instrument_type)
self._decoders["kline"][instrument_type] = decoder_ws_kline()

self._decoder_ws_msg_general = msgspec.json.Decoder(BybitWsMessageGeneral)

self._tob_quotes: set[InstrumentId] = set()
Expand Down Expand Up @@ -593,12 +597,12 @@ def _handle_ws_data(self, instrument_type: BybitInstrumentType, topic: str, raw:
elif "tickers" in topic:
self._handle_ticker(instrument_type, raw)
elif "kline" in topic:
self._handle_kline(raw)
self._handle_kline(instrument_type, raw)
else:
self._log.error(f"Unknown websocket message topic: {topic} in Bybit")

def _handle_orderbook(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
msg = self._decoders["orderbook"].decode(raw)
msg = self._decoders["orderbook"][instrument_type].decode(raw)
symbol = msg.data.s + f"-{instrument_type.value.upper()}"
instrument_id: InstrumentId = self._get_cached_instrument_id(symbol)

Expand Down Expand Up @@ -639,7 +643,7 @@ def _handle_orderbook(self, instrument_type: BybitInstrumentType, raw: bytes) ->
self._handle_data(deltas)

def _handle_ticker(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
msg = self._decoders["ticker"].decode(raw)
msg = self._decoders["ticker"][instrument_type].decode(raw)
try:
symbol = msg.data.symbol + f"-{instrument_type.value.upper()}"
instrument_id: InstrumentId = self._get_cached_instrument_id(symbol)
Expand Down Expand Up @@ -677,7 +681,7 @@ def _handle_ticker(self, instrument_type: BybitInstrumentType, raw: bytes) -> No
self._log.error(f"Failed to parse ticker: {msg} with error {e}")

def _handle_trade(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
msg = self._decoders["trade"].decode(raw)
msg = self._decoders["trade"][instrument_type].decode(raw)
try:
for data in msg.data:
symbol = data.s + f"-{instrument_type.value.upper()}"
Expand All @@ -690,8 +694,8 @@ def _handle_trade(self, instrument_type: BybitInstrumentType, raw: bytes) -> Non
except Exception as e:
self._log.error(f"Failed to parse trade tick: {msg} with error {e}")

def _handle_kline(self, raw: bytes) -> None:
msg = self._decoders["kline"].decode(raw)
def _handle_kline(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
msg = self._decoders["kline"][instrument_type].decode(raw)
try:
bar_type = self._topic_bar_type.get(msg.topic)
for data in msg.data:
Expand Down
99 changes: 93 additions & 6 deletions nautilus_trader/adapters/bybit/schemas/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,39 @@ class BybitWsTickerOptionMsg(msgspec.Struct):
################################################################################


class BybitWsTrade(msgspec.Struct):
class BybitWsTradeSpot(msgspec.Struct):
# The timestamp (ms) that the order is filled
T: int
# Symbol name
s: str
# Side of taker. Buy,Sell
S: str
# Trade size
v: str
# Trade price
p: str
# Trade id
i: str
# Whether is a block trade or not
BT: bool

def parse_to_trade_tick(
self,
instrument_id: InstrumentId,
ts_init: int,
) -> TradeTick:
return TradeTick(
instrument_id=instrument_id,
price=Price.from_str(self.p),
size=Quantity.from_str(self.v),
aggressor_side=AggressorSide.SELLER if self.S == "Sell" else AggressorSide.BUYER,
trade_id=TradeId(str(self.i)),
ts_event=millis_to_nanos(self.T),
ts_init=ts_init,
)


class BybitWsTradeLinear(msgspec.Struct):
# The timestamp (ms) that the order is filled
T: int
# Symbol name
Expand Down Expand Up @@ -482,15 +514,70 @@ def parse_to_trade_tick(
)


class BybitWsTradeMsg(msgspec.Struct):
class BybitWsTradeOption(msgspec.Struct):
# Message id unique to options
id: str
# The timestamp (ms) that the order is filled
T: int
# Symbol name
s: str
# Side of taker. Buy,Sell
S: str
# Trade size
v: str
# Trade price
p: str
# Trade id
i: str
# Whether is a block trade or not
BT: bool

def parse_to_trade_tick(
self,
instrument_id: InstrumentId,
ts_init: int,
) -> TradeTick:
return TradeTick(
instrument_id=instrument_id,
price=Price.from_str(self.p),
size=Quantity.from_str(self.v),
aggressor_side=AggressorSide.SELLER if self.S == "Sell" else AggressorSide.BUYER,
trade_id=TradeId(str(self.i)),
ts_event=millis_to_nanos(self.T),
ts_init=ts_init,
)


class BybitWsTradeSpotMsg(msgspec.Struct):
topic: str
type: str
ts: int
data: list[BybitWsTradeSpot]


class BybitWsTradeLinearMsg(msgspec.Struct):
topic: str
type: str
ts: int
data: list[BybitWsTrade]
data: list[BybitWsTradeLinear]


def decoder_ws_trade():
return msgspec.json.Decoder(BybitWsTradeMsg)
class BybitWsTradeOptionMsg(msgspec.Struct):
topic: str
type: str
ts: int
data: list[BybitWsTradeOption]


def decoder_ws_trade(instrument_type: BybitInstrumentType) -> msgspec.json.Decoder:
if instrument_type == BybitInstrumentType.LINEAR:
return msgspec.json.Decoder(BybitWsTradeLinearMsg)
elif instrument_type == BybitInstrumentType.SPOT:
return msgspec.json.Decoder(BybitWsTradeSpotMsg)
elif instrument_type == BybitInstrumentType.OPTION:
return msgspec.json.Decoder(BybitWsTradeOptionMsg)
else:
raise ValueError(f"Invalid instrument type: {instrument_type}")


def decoder_ws_ticker(instrument_type: BybitInstrumentType) -> msgspec.json.Decoder:
Expand All @@ -501,7 +588,7 @@ def decoder_ws_ticker(instrument_type: BybitInstrumentType) -> msgspec.json.Deco
elif instrument_type == BybitInstrumentType.OPTION:
return msgspec.json.Decoder(BybitWsTickerOptionMsg)
else:
raise ValueError(f"Invalid account type: {instrument_type}")
raise ValueError(f"Invalid instrument type: {instrument_type}")


def decoder_ws_kline():
Expand Down
32 changes: 28 additions & 4 deletions tests/integration_tests/adapters/bybit/test_ws_decoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
from nautilus_trader.adapters.bybit.schemas.ws import BybitWsTickerOptionMsg
from nautilus_trader.adapters.bybit.schemas.ws import BybitWsTickerSpot
from nautilus_trader.adapters.bybit.schemas.ws import BybitWsTickerSpotMsg
from nautilus_trader.adapters.bybit.schemas.ws import BybitWsTrade
from nautilus_trader.adapters.bybit.schemas.ws import BybitWsTradeMsg
from nautilus_trader.adapters.bybit.schemas.ws import BybitWsTradeLinear
from nautilus_trader.adapters.bybit.schemas.ws import BybitWsTradeLinearMsg
from nautilus_trader.adapters.bybit.schemas.ws import BybitWsTradeSpot
from nautilus_trader.adapters.bybit.schemas.ws import BybitWsTradeSpotMsg


class TestBybitWsDecoders:
Expand Down Expand Up @@ -262,9 +264,9 @@ def test_ws_public_trade(self):
"ws_trade.json",
)
assert item is not None
decoder = msgspec.json.Decoder(BybitWsTradeMsg)
decoder = msgspec.json.Decoder(BybitWsTradeLinearMsg)
result = decoder.decode(item)
target_trade = BybitWsTrade(
target_trade = BybitWsTradeLinear(
T=1672304486865,
s="BTCUSDT",
S="Buy",
Expand All @@ -279,6 +281,28 @@ def test_ws_public_trade(self):
assert result.type == "snapshot"
assert result.ts == 1672304486868

def test_ws_public_trade_spot(self):
item = pkgutil.get_data(
"tests.integration_tests.adapters.bybit.resources.ws_messages.public",
"ws_trade.json",
)
assert item is not None
decoder = msgspec.json.Decoder(BybitWsTradeSpotMsg)
result = decoder.decode(item)
target_trade = BybitWsTradeSpot(
T=1672304486865,
s="BTCUSDT",
S="Buy",
v="0.001",
p="16578.50",
i="20f43950-d8dd-5b31-9112-a178eb6023af",
BT=False,
)
assert result.data == [target_trade]
assert result.topic == "publicTrade.BTCUSDT"
assert result.type == "snapshot"
assert result.ts == 1672304486868

def test_ws_private_execution(self):
item = pkgutil.get_data(
"tests.integration_tests.adapters.bybit.resources.ws_messages.private",
Expand Down