Skip to content

Commit

Permalink
Use retry manager when sending websocket messages for dYdX
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsblom committed Jan 9, 2025
1 parent b336361 commit 1543788
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 16 deletions.
6 changes: 6 additions & 0 deletions nautilus_trader/adapters/dydx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,19 @@ class DYDXDataClientConfig(LiveDataClientConfig, frozen=True):
max_ws_reconnection_tries: int, default 3
The number of retries to reconnect the websocket connection if the
connection is broken.
max_ws_send_retries : int, optional
Maximum retries when sending websocket messages.
max_ws_retry_delay_secs : float, optional
The delay (seconds) between retry attempts when resending websocket messages.
"""

wallet_address: str | None = None
is_testnet: bool = False
update_instruments_interval_mins: PositiveInt | None = 60
max_ws_reconnection_tries: int | None = 3
max_ws_send_retries: PositiveInt | None = None
max_ws_retry_delay_secs: PositiveFloat | None = None


class DYDXExecClientConfig(LiveExecClientConfig, frozen=True):
Expand Down
2 changes: 2 additions & 0 deletions nautilus_trader/adapters/dydx/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ def __init__(
base_url=ws_base_url,
loop=loop,
max_reconnection_tries=config.max_ws_reconnection_tries,
max_send_retries=config.max_ws_send_retries,
retry_delay_secs=config.max_ws_retry_delay_secs,
)

# HTTP API
Expand Down
9 changes: 2 additions & 7 deletions nautilus_trader/adapters/dydx/http/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from nautilus_trader.adapters.dydx.common.constants import DYDX_RETRY_ERRORS_GRPC
from nautilus_trader.adapters.dydx.grpc.errors import DYDXGRPCError
from nautilus_trader.core.nautilus_pyo3 import HttpTimeoutError
from nautilus_trader.core.nautilus_pyo3 import WebSocketClientError


class DYDXError(Exception):
Expand Down Expand Up @@ -58,13 +59,7 @@ def should_retry(error: BaseException) -> bool:
if isinstance(error, DYDXGRPCError):
return error.code in DYDX_RETRY_ERRORS_GRPC

if isinstance(error, AioRpcError):
return True

if isinstance(error, DYDXError):
return True

if isinstance(error, HttpTimeoutError):
if isinstance(error, AioRpcError | DYDXError | HttpTimeoutError | WebSocketClientError):
return True

return False
47 changes: 38 additions & 9 deletions nautilus_trader/adapters/dydx/websocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import pandas as pd

from nautilus_trader.adapters.dydx.common.enums import DYDXCandlesResolution
from nautilus_trader.adapters.dydx.http.errors import should_retry
from nautilus_trader.common.component import LiveClock
from nautilus_trader.common.component import Logger
from nautilus_trader.common.enums import LogColor
from nautilus_trader.core.nautilus_pyo3 import Quota
from nautilus_trader.core.nautilus_pyo3 import WebSocketClient
from nautilus_trader.core.nautilus_pyo3 import WebSocketClientError
from nautilus_trader.core.nautilus_pyo3 import WebSocketConfig
from nautilus_trader.live.retry import RetryManagerPool


class DYDXWebsocketClient:
Expand All @@ -52,9 +54,13 @@ class DYDXWebsocketClient:
The event loop for the client.
subscription_rate_limit_per_second : int, default 2
The maximum number of subscription message to send to the venue.
max_reconnection_tries: int, default 3
max_reconnection_tries : int, default 3
The number of retries to reconnect the websocket connection if the
connection is broken.
max_send_retries : int, optional
Maximum retries when sending websocket messages.
retry_delay_secs : float, optional
The delay (seconds) between retry attempts when resending websocket messages.
"""

Expand All @@ -67,6 +73,8 @@ def __init__(
loop: asyncio.AbstractEventLoop,
subscription_rate_limit_per_second: int = 2,
max_reconnection_tries: int | None = 3,
max_send_retries: int | None = 0,
retry_delay_secs: float | None = None,
) -> None:
"""
Provide a dYdX streaming WebSocket client.
Expand All @@ -85,6 +93,8 @@ def __init__(
self._msg_timestamp = self._clock.utc_now()
self._msg_timeout_secs: int = 60
self._reconnect_task: asyncio.Task | None = None
self._max_send_retries = max_send_retries
self._retry_delay_secs = retry_delay_secs

def is_connected(self) -> bool:
"""
Expand Down Expand Up @@ -143,6 +153,15 @@ async def connect(self) -> None:
Connect to the websocket server.
"""
self._is_running = True
self._retry_manager_pool = RetryManagerPool(
pool_size=100,
max_retries=self._max_send_retries or 0,
retry_delay_secs=self._retry_delay_secs or 1.0,
logger=self._log,
exc_types=(WebSocketClientError,),
retry_check=should_retry,
)

self._log.debug(f"Connecting to {self._base_url} websocket stream")
config = WebSocketConfig(
url=self._base_url,
Expand Down Expand Up @@ -197,10 +216,13 @@ async def send_pong(self, raw: bytes) -> None:
if self._client is None:
return

try:
await self._client.send_pong(raw)
except WebSocketClientError as e:
self._log.error(f"Failed to send pong: {e}")
async with self._retry_manager_pool as retry_manager:
await retry_manager.run(
name="send_pong",
details=[raw],
func=self._client.send_pong,
data=raw,
)

async def _reconnect_guard(self) -> None:
"""
Expand Down Expand Up @@ -270,6 +292,8 @@ async def disconnect(self) -> None:

self._client = None # Dispose (will go out of scope)

self._retry_manager_pool.shutdown()

self._log.info(f"Disconnected from {self._base_url}", LogColor.BLUE)

async def subscribe_trades(self, symbol: str) -> None:
Expand Down Expand Up @@ -592,7 +616,12 @@ async def _send(self, msg: dict[str, Any]) -> None:

self._log.debug(f"SENDING: {msg}")

try:
await self._client.send_text(msgspec.json.encode(msg))
except WebSocketClientError as e:
self._log.error(f"Failed to send websocket message: {e}")
data = msgspec.json.encode(msg)

async with self._retry_manager_pool as retry_manager:
await retry_manager.run(
name="send_text",
details=[data],
func=self._client.send_text,
data=data,
)

0 comments on commit 1543788

Please sign in to comment.