Skip to content

Commit

Permalink
Implement new Binance websocket spec
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Mar 8, 2024
1 parent 5bf2e79 commit 954cdeb
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
1 change: 1 addition & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ None
- Fixed Interactive Brokers connection error logging (#1524), thanks @benjaminsingleton
- Fixed `SimulationModuleConfig` location and missing re-export from `config` subpackage
- Fixed logging `StdoutWriter` from also writing error logs (writers were duplicating error logs)
- Fixed `BinanceWebSocketClient` to [new specification](https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams) which requires responding to pings with a pong containing the pings payload

---

Expand Down
23 changes: 14 additions & 9 deletions nautilus_core/network/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures_util::{
SinkExt, StreamExt,
};
use hyper::header::HeaderName;
use nautilus_core::python::to_pyruntime_err;
use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
use pyo3::{prelude::*, types::PyBytes};
use tokio::{net::TcpStream, sync::Mutex, task, time::sleep};
use tokio_tungstenite::{
Expand Down Expand Up @@ -109,7 +109,6 @@ impl WebSocketClientInner {

// Keep receiving messages from socket and pass them as arguments to handler
let read_task = Self::spawn_read_task(reader, handler.clone(), ping_handler.clone());

let heartbeat_task =
Self::spawn_heartbeat_task(*heartbeat, heartbeat_msg.clone(), writer.clone());

Expand Down Expand Up @@ -148,7 +147,7 @@ impl WebSocketClientInner {
message: Option<String>,
writer: SharedMessageWriter,
) -> Option<task::JoinHandle<()>> {
debug!("Started heartbeat task");
debug!("Started task `heartbeat`");
heartbeat.map(|duration| {
task::spawn(async move {
let duration = Duration::from_secs(duration);
Expand All @@ -174,7 +173,7 @@ impl WebSocketClientInner {
handler: PyObject,
ping_handler: Option<PyObject>,
) -> task::JoinHandle<()> {
debug!("Started read task");
debug!("Started task `read`");
task::spawn(async move {
loop {
match reader.next().await {
Expand All @@ -199,7 +198,8 @@ impl WebSocketClientInner {
continue;
}
Some(Ok(Message::Ping(ping))) => {
debug!("Received ping");
let payload = String::from_utf8(ping.clone()).expect("Invalid payload");
debug!("Received ping: {payload}",);
if let Some(ref handler) = ping_handler {
if let Err(e) =
Python::with_gil(|py| handler.call1(py, (PyBytes::new(py, &ping),)))
Expand Down Expand Up @@ -261,7 +261,7 @@ impl WebSocketClientInner {
debug!("Closed connection");
}

/// Reconnect with server
/// Reconnect with server.
///
/// Make a new connection with server. Use the new read and write halves
/// to update self writer and read and heartbeat tasks.
Expand Down Expand Up @@ -327,13 +327,14 @@ impl WebSocketClient {
/// Creates a websocket client.
///
/// Creates an inner client and controller task to reconnect or disconnect
/// the client. Also assumes ownership of writer from inner client
/// the client. Also assumes ownership of writer from inner client.
pub async fn connect(
config: WebSocketConfig,
post_connection: Option<PyObject>,
post_reconnection: Option<PyObject>,
post_disconnection: Option<PyObject>,
) -> Result<Self, Error> {
debug!("Connecting");
let inner = WebSocketClientInner::connect_url(config).await?;
let writer = inner.writer.clone();
let disconnect_mode = Arc::new(Mutex::new(false));
Expand Down Expand Up @@ -368,10 +369,12 @@ impl WebSocketClient {
/// Controller task will periodically check the disconnect mode
/// and shutdown the client if it is alive
pub async fn disconnect(&self) {
debug!("Disconnecting");
*self.disconnect_mode.lock().await = true;
}

pub async fn send_bytes(&self, data: Vec<u8>) -> Result<(), Error> {
debug!("Sending bytes: {:?}", data);
let mut guard = self.writer.lock().await;
guard.send(Message::Binary(data)).await
}
Expand Down Expand Up @@ -494,7 +497,6 @@ impl WebSocketClient {
#[pyo3(name = "disconnect")]
fn py_disconnect<'py>(slf: PyRef<'_, Self>, py: Python<'py>) -> PyResult<&'py PyAny> {
let disconnect_mode = slf.disconnect_mode.clone();
debug!("Setting disconnect mode to true");
pyo3_asyncio::tokio::future_into_py(py, async move {
*disconnect_mode.lock().await = true;
Ok(())
Expand All @@ -508,6 +510,7 @@ impl WebSocketClient {
/// - Raises PyRuntimeError if not able to send data.
#[pyo3(name = "send")]
fn py_send<'py>(slf: PyRef<'_, Self>, data: Vec<u8>, py: Python<'py>) -> PyResult<&'py PyAny> {
debug!("Sending bytes {:?}", data);
let writer = slf.writer.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let mut guard = writer.lock().await;
Expand All @@ -529,6 +532,7 @@ impl WebSocketClient {
data: String,
py: Python<'py>,
) -> PyResult<&'py PyAny> {
debug!("Sending text: {}", data);
let writer = slf.writer.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let mut guard = writer.lock().await;
Expand All @@ -550,7 +554,8 @@ impl WebSocketClient {
data: Vec<u8>,
py: Python<'py>,
) -> PyResult<&'py PyAny> {
debug!("Sending pong");
let data_str = String::from_utf8(data.clone()).map_err(to_pyvalue_err)?;
debug!("Sending pong: {}", data_str);
let writer = slf.writer.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let mut guard = writer.lock().await;
Expand Down
19 changes: 12 additions & 7 deletions nautilus_trader/adapters/binance/websocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def connect(self) -> None:
handler=self._handler,
heartbeat=60,
headers=[],
ping_handler=self.handle_ping,
ping_handler=self._handle_ping,
)

self._inner = await WebSocketClient.connect(
Expand All @@ -133,12 +133,17 @@ async def connect(self) -> None:
self._log.info(f"Connected to {self._base_url}.", LogColor.BLUE)
self._log.info(f"Subscribed to {initial_stream}.", LogColor.BLUE)

def handle_ping(self, raw: bytes) -> None:
def _handle_ping(self, raw: bytes) -> None:
self._loop.create_task(self.send_pong(raw))

async def send_pong(self, raw: bytes) -> None:
"""
Send the given raw payload to the server as a PONG message.
"""
if self._inner is None:
return

# Send ping payload back to server as pong
# self._inner.send_pong(raw) # WIP
await self._inner.send_pong(raw)

# TODO: Temporarily synch
def reconnect(self) -> None:
Expand Down Expand Up @@ -466,7 +471,7 @@ async def _subscribe(self, stream: str) -> None:
message = self._create_subscribe_msg(streams=[stream])
self._log.debug(f"SENDING: {message}")

self._inner.send_text(json.dumps(message))
await self._inner.send_text(json.dumps(message))
self._log.info(f"Subscribed to {stream}.", LogColor.BLUE)

async def _subscribe_all(self) -> None:
Expand All @@ -477,7 +482,7 @@ async def _subscribe_all(self) -> None:
message = self._create_subscribe_msg(streams=self._streams)
self._log.debug(f"SENDING: {message}")

self._inner.send_text(json.dumps(message))
await self._inner.send_text(json.dumps(message))
for stream in self._streams:
self._log.info(f"Subscribed to {stream}.", LogColor.BLUE)

Expand All @@ -495,7 +500,7 @@ async def _unsubscribe(self, stream: str) -> None:
message = self._create_unsubscribe_msg(streams=[stream])
self._log.debug(f"SENDING: {message}")

self._inner.send_text(json.dumps(message))
await self._inner.send_text(json.dumps(message))
self._log.info(f"Unsubscribed from {stream}.", LogColor.BLUE)

def _create_subscribe_msg(self, streams: list[str]) -> dict[str, Any]:
Expand Down

0 comments on commit 954cdeb

Please sign in to comment.