Skip to content

Commit

Permalink
Continue Bybit adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Mar 24, 2024
1 parent c1ad9e5 commit 181efe5
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 39 deletions.
7 changes: 5 additions & 2 deletions examples/live/bybit/bybit_market_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from decimal import Decimal

from nautilus_trader.adapters.bybit.common.enums import BybitInstrumentType
from nautilus_trader.adapters.bybit.config import BybitDataClientConfig
from nautilus_trader.adapters.bybit.config import BybitExecClientConfig
from nautilus_trader.adapters.bybit.factories import BybitLiveDataClientFactory
Expand Down Expand Up @@ -71,8 +72,9 @@
api_key=None, # 'BYBIT_API_KEY' env var
api_secret=None, # 'BYBIT_API_SECRET' env var
base_url_http=None, # Override with custom endpoint
testnet=False, # If client uses the testnet
instrument_provider=InstrumentProviderConfig(load_all=True),
instrument_types=[BybitInstrumentType.LINEAR],
testnet=False, # If client uses the testnet
),
},
exec_clients={
Expand All @@ -81,8 +83,9 @@
api_secret=None, # 'BYBIT_API_SECRET' env var
base_url_http=None, # Override with custom endpoint
base_url_ws=None, # Override with custom endpoint
testnet=False, # If client uses the testnet
instrument_provider=InstrumentProviderConfig(load_all=True),
instrument_types=[BybitInstrumentType.LINEAR],
testnet=False, # If client uses the testnet
),
},
timeout_connection=20.0,
Expand Down
65 changes: 37 additions & 28 deletions nautilus_trader/adapters/bybit/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from nautilus_trader.cache.cache import Cache
from nautilus_trader.common.component import LiveClock
from nautilus_trader.common.component import MessageBus
from nautilus_trader.common.enums import LogColor
from nautilus_trader.core.datetime import secs_to_millis
from nautilus_trader.core.message import Request
from nautilus_trader.core.nautilus_pyo3 import Symbol
Expand Down Expand Up @@ -163,7 +164,7 @@ async def fetch_send_tickers(
self._msgbus.response(data)

def complete_fetch_tickers_task(self, request: Request) -> None:
# extract symbol from metadat
# Extract symbol from metadata
if "symbol" not in request.metadata:
raise ValueError("Symbol not in request metadata")
symbol = request.metadata["symbol"]
Expand Down Expand Up @@ -192,6 +193,14 @@ async def _connect(self) -> None:
await ws_client.connect()
self._log.info("Data client connected.")

async def _disconnect(self) -> None:
if self._update_instruments_task:
self._log.debug("Cancelling `update_instruments` task.")
self._update_instruments_task.cancel()
self._update_instruments_task = None
for ws_client in self._ws_clients.values():
await ws_client.disconnect()

def _send_all_instruments_to_data_engine(self) -> None:
for instrument in self._instrument_provider.get_all().values():
self._handle_data(instrument)
Expand All @@ -212,31 +221,47 @@ async def _update_instruments(self) -> None:
except asyncio.CancelledError:
self._log.debug("Canceled `update_instruments` task.")

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
bybit_symbol = BybitSymbol(instrument_id.symbol.value)
assert bybit_symbol # type checking
ws_client = self._ws_clients[bybit_symbol.instrument_type]
await ws_client.subscribe_tickers(bybit_symbol.raw_symbol)
self._log.info(f"Subscribed {instrument_id} quote ticks.", LogColor.BLUE)

async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
bybit_symbol = BybitSymbol(instrument_id.symbol.value)
assert bybit_symbol # type checking
ws_client = self._ws_clients[bybit_symbol.instrument_type]
await ws_client.subscribe_trades(bybit_symbol.raw_symbol)
self._log.info(f"Subscribed to trade ticks for {instrument_id}.")

# async def _subscribe_ticker(self, instrument_id: InstrumentId) -> None:
# symbol = BybitSymbol(instrument_id.symbol.value)
# ws_client = self._ws_clients[symbol.instrument_type]
# await ws_client.subscribe_tickers(symbol.raw_symbol)
# self._log.info(f"Subscribed to ticker for {instrument_id}.")
self._log.info(f"Subscribed {instrument_id} trade ticks.", LogColor.BLUE)

def _handle_ws_message(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
try:
ws_message = self._decoder_ws_msg_general.decode(raw)
if ws_message.success is False:
self._log.error(f"Error in ws_message: {ws_message.ret_msg}")
return
## check if there is topic, if not discard it
## Check if there is topic, if not discard the message
if ws_message.topic:
self._topic_check(instrument_type, ws_message.topic, raw)
except Exception as e:
decoded_raw = raw.decode("utf-8")
raise RuntimeError(f"Unknown websocket message type: {decoded_raw}") from e
self._log.error(f"Failed to parse ticker: {raw.decode()} with error {e}")

def _handle_ticker(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
try:
msg = self._decoders["ticker"].decode(raw)
self._log.warning(f"{msg}")

for quote_tick in msg.data:
symbol = quote_tick.s + f"-{instrument_type.value.upper()}"
instrument_id: InstrumentId = self._get_cached_instrument_id(symbol)
quote_tick = quote_tick.parse_to_quote_tick(
instrument_id,
self._clock.timestamp_ns(),
)
self._handle_data(quote_tick)
except Exception as e:
self._log.error(f"Failed to parse ticker: {raw.decode()} with error {e}")

def _handle_trade(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
try:
Expand All @@ -250,15 +275,7 @@ def _handle_trade(self, instrument_type: BybitInstrumentType, raw: bytes) -> Non
)
self._handle_data(trade_tick)
except Exception as e:
print("error in handle trade", e)
decoded_raw = raw.decode("utf-8")
self._log.error(f"Failed to parse trade tick: {decoded_raw}")

def _handle_ticker(self, instrument_type: BybitInstrumentType, raw: bytes) -> None:
try:
self._decoders["ticker"].decode(raw)
except Exception:
print("failed to parse ticker ", raw)
self._log.error(f"Failed to parse trade tick: {raw.decode()} with error {e}")

def _topic_check(self, instrument_type: BybitInstrumentType, topic: str, raw: bytes) -> None:
if "publicTrade" in topic:
Expand Down Expand Up @@ -390,14 +407,6 @@ async def _request_bars(
partial: Bar = bars.pop()
self._handle_bars(bar_type, bars, partial, correlation_id)

async def _disconnect(self) -> None:
if self._update_instruments_task:
self._log.debug("Cancelling `update_instruments` task.")
self._update_instruments_task.cancel()
self._update_instruments_task = None
for instrument_type, ws_client in self._ws_clients.items():
await ws_client.disconnect()

async def _handle_ticker_data_request(self, symbol: Symbol, correlation_id: UUID4) -> None:
bybit_symbol = BybitSymbol(symbol.value)
assert bybit_symbol # type checking
Expand Down
2 changes: 1 addition & 1 deletion nautilus_trader/adapters/bybit/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def _handle_ws_message(self, raw: bytes) -> None:
except Exception as e:
ws_message_sub = self._decoder_ws_subscription.decode(raw)
if ws_message_sub.success:
self._log.info("Success subscribing")
self._log.info(f"Subscribed to stream {ws_message.topic}", LogColor.BLUE)
else:
self._log.error(f"Failed to subscribe. {e!s}")

Expand Down
10 changes: 5 additions & 5 deletions nautilus_trader/adapters/bybit/schemas/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def parse_to_instrument(
size_precision=size_increment,
price_increment=price_increment,
size_increment=size_increment,
margin_init=Decimal(0.1),
margin_maint=Decimal(0.1),
margin_init=Decimal("0.1"),
margin_maint=Decimal("0.1"),
maker_fee=Decimal(fee_rate.makerFeeRate),
taker_fee=Decimal(fee_rate.takerFeeRate),
ts_event=ts_event,
Expand Down Expand Up @@ -236,7 +236,7 @@ def parse_to_instrument(
base_currency=base_currency,
quote_currency=quote_currency,
settlement_currency=settlement_currency,
is_inverse=False, # No inverse instruments trade on Binance
is_inverse=False, # No inverse instruments trade on Bybit
price_precision=price_precision,
size_precision=size_precision,
price_increment=price_increment,
Expand All @@ -247,8 +247,8 @@ def parse_to_instrument(
min_notional=min_notional,
max_price=max_price,
min_price=min_price,
margin_init=Decimal(0.1),
margin_maint=Decimal(0.1),
margin_init=Decimal("0.1"),
margin_maint=Decimal("0.1"),
maker_fee=Decimal(maker_fee),
taker_fee=Decimal(taker_fee),
ts_event=ts_event,
Expand Down
2 changes: 1 addition & 1 deletion nautilus_trader/adapters/bybit/schemas/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def decoder_ws_trade():
return msgspec.json.Decoder(BybitWsTradeMsg)


def decoder_ws_ticker(instrument_type: BybitInstrumentType):
def decoder_ws_ticker(instrument_type: BybitInstrumentType) -> msgspec.json.Decoder:
if instrument_type == BybitInstrumentType.LINEAR:
return msgspec.json.Decoder(BybitWsTickerLinearMsg)
elif instrument_type == BybitInstrumentType.SPOT:
Expand Down
4 changes: 2 additions & 2 deletions nautilus_trader/adapters/bybit/websocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def __init__(
clock: LiveClock,
base_url: str,
handler: Callable[[bytes], None],
api_key: str | None = None,
api_secret: str | None = None,
api_key: str,
api_secret: str,
is_private: bool | None = False,
) -> None:
self._clock = clock
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -------------------------------------------------------------------------------------------------
# Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved.
# https://nautechsystems.io
#
# Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -------------------------------------------------------------------------------------------------

import os

import pytest

from nautilus_trader.adapters.bybit.common.enums import BybitInstrumentType
from nautilus_trader.adapters.bybit.factories import get_bybit_http_client
from nautilus_trader.adapters.bybit.provider import BybitInstrumentProvider
from nautilus_trader.common.component import LiveClock
from nautilus_trader.model.identifiers import InstrumentId


@pytest.mark.asyncio()
async def test_bybit_instrument_provider():
clock = LiveClock()
client = get_bybit_http_client(
clock=clock,
key=os.getenv("BYBIT_API_KEY"),
secret=os.getenv("BYBIT_API_SECRET"),
is_testnet=False,
)

provider = BybitInstrumentProvider(
client=client,
clock=clock,
instrument_types=[
BybitInstrumentType.SPOT,
BybitInstrumentType.LINEAR,
BybitInstrumentType.OPTION,
# BybitInstrumentType.INVERSE, # Supported?
],
)

# await provider.load_all_async()
ethusdt_linear = InstrumentId.from_str("ETHUSDT-LINEAR.BYBIT")
await provider.load_ids_async(instrument_ids=[ethusdt_linear])
await provider.load_all_async()

print(provider.list_all())
print(provider.count)

0 comments on commit 181efe5

Please sign in to comment.