From d35cfa6b047273e5e00e9b761a5bbb303233fa04 Mon Sep 17 00:00:00 2001 From: sunce86 Date: Mon, 4 Sep 2023 13:40:39 +0200 Subject: [PATCH] refactor --- crates/shared/src/arguments.rs | 6 + crates/shared/src/zeroex_api.rs | 5 +- crates/solver/src/liquidity/zeroex.rs | 465 ++++++++++++++------------ crates/solver/src/run.rs | 1 + 4 files changed, 252 insertions(+), 225 deletions(-) diff --git a/crates/shared/src/arguments.rs b/crates/shared/src/arguments.rs index 242008a18b..275d4bc2d4 100644 --- a/crates/shared/src/arguments.rs +++ b/crates/shared/src/arguments.rs @@ -238,12 +238,17 @@ pub struct Arguments { #[clap(long, env, default_value = "ParaSwapPool4", use_value_delimiter = true)] pub disabled_paraswap_dexs: Vec, + /// The 0x HTTP API URL. #[clap(long, env)] pub zeroex_url: Option, #[clap(long, env)] pub zeroex_api_key: Option, + /// The 0x Websocket API URL. + #[clap(long, env)] + pub zeroex_ws_url: Option, + /// If solvers should use internal buffers to improve solution quality. #[clap(long, env, action = clap::ArgAction::Set, default_value = "false")] pub use_internal_buffers: bool, @@ -434,6 +439,7 @@ impl Display for Arguments { display_list(f, "disabled_paraswap_dexs", &self.disabled_paraswap_dexs)?; display_option(f, "zeroex_url", &self.zeroex_url)?; display_secret_option(f, "zeroex_api_key", &self.zeroex_api_key)?; + display_option(f, "zeroex_ws_url", &self.zeroex_ws_url)?; writeln!(f, "use_internal_buffers: {}", self.use_internal_buffers)?; writeln!(f, "balancer_factories: {:?}", self.balancer_factories)?; display_list( diff --git a/crates/shared/src/zeroex_api.rs b/crates/shared/src/zeroex_api.rs index 80cb2a7d87..295d6645fe 100644 --- a/crates/shared/src/zeroex_api.rs +++ b/crates/shared/src/zeroex_api.rs @@ -620,7 +620,8 @@ pub mod websocket { // all numbers are at most u128::MAX so none of these operations can overflow let scaled_maker_amount = U256::from(self.order.maker_amount) * U256::from(self.metadata.remaining_fillable_taker_amount) - / U256::from(self.order.taker_amount); + .checked_div(self.order.taker_amount.into()) + .context("taker_amount is zero")?; // `scaled_maker_amount` is at most as big as `maker_amount` which already fits // in an u128 @@ -636,7 +637,7 @@ pub mod websocket { remaining_fillable_taker_amount: order_record .metadata .remaining_fillable_taker_amount, - state: Default::default(), + state: OrderState::Added, }, order: order_record.order, } diff --git a/crates/solver/src/liquidity/zeroex.rs b/crates/solver/src/liquidity/zeroex.rs index 4060b74621..c89dea6d0b 100644 --- a/crates/solver/src/liquidity/zeroex.rs +++ b/crates/solver/src/liquidity/zeroex.rs @@ -15,7 +15,7 @@ use { futures::{SinkExt, StreamExt}, model::{order::OrderKind, TokenPair}, primitive_types::{H160, U256}, - reqwest::Url, + reqwest::{IntoUrl, Url}, shared::{ ethrpc::Web3, http_solver::model::TokenAmount, @@ -37,12 +37,224 @@ use { tracing::Instrument, }; +const DEFAULT_ZEROEX_WEBSOCKET_API: &str = "wss://api.0x.org/orderbook/v1"; + +type Socket = WebSocketStream>; + +#[derive(Clone)] +pub struct ZeroExWebsocketApi { + url: Url, +} + +impl ZeroExWebsocketApi { + pub fn new(url: impl IntoUrl) -> Self { + Self { + url: url + .into_url() + .unwrap_or(Url::parse(DEFAULT_ZEROEX_WEBSOCKET_API).unwrap()), + } + } +} + +impl ZeroExWebsocketApi { + /// Creates a websocket connection to 0x and subscribes to orderbook + /// updates. Once this function is done, the websocket is open and ready + /// to receive messages. + async fn open_socket(&self) -> Result { + let mut socket = tokio_tungstenite::connect_async(self.url.clone()).await?.0; + + // Construct the subscription message to fetch all orders + let subscription_msg = serde_json::json!({ + "type": "subscribe", + "channel": "orders", + "requestId": "cowswap", + }); + + socket + .send(Message::Text(subscription_msg.to_string())) + .await?; + + Ok(socket) + } +} + +#[derive(Debug)] +enum UpdateError { + // The websocket is closed on server side, with optional reason + SocketClosed(Option), + // Received a message type that we don't handle + UnsupportedMessage, + // Received proper `text` type of message but it can't be deserialized (mailformed or changed + // format) + DeserializeError(anyhow::Error), + Other(anyhow::Error), +} + +impl From for UpdateError { + fn from(err: anyhow::Error) -> Self { + Self::Other(err) + } +} + +impl From for anyhow::Error { + fn from(err: UpdateError) -> Self { + match err { + UpdateError::SocketClosed(err) => anyhow::anyhow!("socket closed, reason {:?}", err), + UpdateError::UnsupportedMessage => anyhow::anyhow!("unsupported message"), + UpdateError::DeserializeError(err) => err, + UpdateError::Other(err) => err, + } + } +} + +#[derive(Clone)] +struct ZeroExCache { + /// Fetch orders that can be filled by this address + sender: Address, + orders: Arc>>, +} + +impl ZeroExCache { + pub fn new(sender: Address) -> Self { + Self { + sender, + orders: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Calls the 0x API to get all orders and initializes the cache with them. + async fn init_cache(&self, api: Arc) -> Result<()> { + { + // init can be called multiple times, so clear the cache first + // clearing is done before fetching 0x orders because fetching can keep failing + // for a long time and we don't want to keep stale orders + let mut cache = self.orders.lock().await; + cache.clear(); + } + + let queries = &[ + // orders fillable by anyone + OrdersQuery::default(), + // orders fillable only by our settlement contract + OrdersQuery { + sender: Some(self.sender), + ..Default::default() + }, + ]; + + let zeroex_orders = + futures::future::try_join_all(queries.iter().map(|query| api.get_orders(query))) + .await + .context("failed to fetch 0x limit orders")?; + let zeroex_orders: Vec<_> = zeroex_orders.into_iter().flatten().collect(); + + let mut cache = self.orders.lock().await; + cache.extend( + zeroex_orders + .into_iter() + .map(|order| (hex::encode(&order.metadata.order_hash), order.into())), + ); + tracing::debug!("Initialized 0x cache with {} orders", cache.len()); + + Ok(()) + } + + /// Creates a websocket connection and reads the messages from it. + async fn connect_and_update_cache(&self, api: ZeroExWebsocketApi) -> Result<(), UpdateError> { + let mut socket = api.open_socket().await?; + let result = self.update_cache(&mut socket).await; + // this call will error if the socket is already closed but that's fine + let _ = socket.close(None).await; + result + } + + /// Indifinitelly reads messages from the websocket and updates the cache. + async fn update_cache(&self, socket: &mut Socket) -> Result<(), UpdateError> { + while let Some(msg) = socket.next().await { + let msg = msg.context("websocket error")?; + let text = match msg { + Message::Text(text) => text, + Message::Close(frame) => { + return Err(UpdateError::SocketClosed( + frame.map(|frame| anyhow!(frame.reason)), + )) + } + Message::Ping(payload) => { + // send pong message to keep the connection alive + socket + .send(Message::Pong(payload)) + .await + .context("ping pong failure")?; + continue; + } + _ => { + tracing::error!("Received unsupported message {:?}", msg); + return Err(UpdateError::UnsupportedMessage); + } + }; + + let records = serde_json::from_str::(&text) + .with_context(|| format!("deserialization error, text received: {}", text)) + .map_err(UpdateError::DeserializeError)? + .payload; + + let mut cache = self.orders.lock().await; + for record in records { + match record.metadata.state { + OrderState::Added | OrderState::Updated | OrderState::Fillable => { + cache.insert(hex::encode(record.metadata.order_hash.clone()), record); + } + OrderState::Expired => { + cache.remove(&hex::encode(record.metadata.order_hash)); + } + } + } + } + + Ok(()) + } + + /// Main update loop + pub async fn update(&self, init_api: Arc, update_api: ZeroExWebsocketApi) { + let mut backoff = DEFAULT_BACKOFF; + // loop needed for reconnections + loop { + // initialize cache by fetching all existing orders via http api + if let Err(err) = self.init_cache(init_api.clone()).await { + tracing::warn!( + "Error initializing 0x cache: {:?}, retrying with backoff {:?} ...", + err, + backoff + ); + tokio::time::sleep(backoff).await; + backoff *= 2; + continue; + } + + backoff = DEFAULT_BACKOFF; + // from now on, rely on websocket connection to do incremental updates + if let Err(err) = self.connect_and_update_cache(update_api.clone()).await { + tracing::debug!("Error updating 0x cache, reconnecting... {:?}", err); + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + } + } + } + + pub async fn orders(&self) -> Vec { + self.orders + .lock() + .await + .values() + .cloned() + .collect::>() + } +} + pub struct ZeroExLiquidity { pub zeroex: IZeroEx, pub gpv2: GPv2Settlement, pub allowance_manager: Box, - // cached orders, updated by background task - pub cache: Arc>>, + cache: ZeroExCache, } // hash generated by the 0x api @@ -51,40 +263,28 @@ type OrderBuckets = HashMap<(H160, H160), Vec>; const DEFAULT_BACKOFF: Duration = Duration::from_secs(3); impl ZeroExLiquidity { - pub fn new(web3: Web3, api: Arc, zeroex: IZeroEx, gpv2: GPv2Settlement) -> Self { + pub fn new( + web3: Web3, + init_api: Arc, + zeroex: IZeroEx, + gpv2: GPv2Settlement, + ws_url: impl IntoUrl, + ) -> Self { let allowance_manager = AllowanceManager::new(web3, gpv2.address()); - let cache: Arc>> = Default::default(); - let inner = cache.clone(); - let api = api.clone(); - let sender = gpv2.address(); - - tokio::task::spawn( - async move { - let mut backoff = DEFAULT_BACKOFF; - // loop needed for reconnections - loop { - // initialize cache by fetching all existing orders via http api - if let Err(err) = init_cache(api.clone(), inner.clone(), sender).await { - tracing::warn!( - "Error initializing 0x cache: {:?}, retrying with backoff {:?} ...", - err, - backoff - ); - tokio::time::sleep(backoff).await; - backoff *= 2; - continue; - } - - backoff = DEFAULT_BACKOFF; - // from now on, rely on websocket connection to do incremental updates - if let Err(err) = connect_and_update_cache(inner.clone()).await { - tracing::debug!("Error updating 0x cache, reconnecting... {:?}", err); - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - } + let cache = ZeroExCache::new(gpv2.address()); + + { + // spawn background task to continually update the cache + let inner = cache.clone(); + let update_api = ZeroExWebsocketApi::new(ws_url); + tokio::task::spawn( + async move { + inner.update(init_api, update_api).await; } - } - .instrument(tracing::debug_span!("0x cache updater")), - ); + .instrument(tracing::debug_span!("0x cache updater")), + ); + } + Self { zeroex, gpv2, @@ -127,160 +327,6 @@ impl ZeroExLiquidity { } } -/// Calls the 0x API to get all orders and initializes the cache with them. -async fn init_cache( - api: Arc, - cache: Arc>>, - sender: Address, -) -> Result<()> { - { - // init can be called multiple times, so clear the cache first - // clearing is done before fetching 0x orders because fetching can keep failing - // for a long time and we don't want to keep stale orders - let mut cache = cache.lock().await; - cache.clear(); - } - - let queries = &[ - // orders fillable by anyone - OrdersQuery::default(), - // orders fillable only by our settlement contract - OrdersQuery { - sender: Some(sender), - ..Default::default() - }, - ]; - - let zeroex_orders = - futures::future::try_join_all(queries.iter().map(|query| api.get_orders(query))) - .await - .context("failed to fetch 0x limit orders")?; - let zeroex_orders: Vec<_> = zeroex_orders.into_iter().flatten().collect(); - - let mut cache = cache.lock().await; - cache.extend( - zeroex_orders - .into_iter() - .map(|order| (hex::encode(&order.metadata.order_hash), order.into())), - ); - tracing::debug!("Initialized 0x cache with {} orders", cache.len()); - - Ok(()) -} - -type Socket = WebSocketStream>; - -/// Creates a websocket connection to 0x and subscribes to orderbook updates. -/// Once this function is done, the websocket is open and ready to receive -/// messages. -async fn connect_socket() -> Result { - let url = Url::parse("wss://api.0x.org/orderbook/v1")?; - let mut socket = tokio_tungstenite::connect_async(url).await?.0; - - // Construct the subscription message to fetch all orders - let subscription_msg = serde_json::json!({ - "type": "subscribe", - "channel": "orders", - "requestId": "cowswap", - }); - - socket - .send(Message::Text(subscription_msg.to_string())) - .await?; - - Ok(socket) -} - -/// Creates a websocket connection and reads the messages from it. -/// The messages are then used to update the cache. -/// If the connection is lost, calling again this function will reconnect. -async fn connect_and_update_cache( - cache: Arc>>, -) -> Result<(), UpdateError> { - let mut socket = connect_socket().await?; - let result = update_cache(&mut socket, cache.clone()).await; - // this call will error if the socket is already closed but that's fine - let _ = socket.close(None).await; - result -} - -#[derive(Debug)] -enum UpdateError { - // The websocket is closed on server side, with optional reason - SocketClosed(Option), - // Received a message type that we don't handle - UnsupportedMessage, - // Received proper `text` type of message but it can't be deserialized (mailformed or changed - // format) - DeserializeError(anyhow::Error), - Other(anyhow::Error), -} - -impl From for UpdateError { - fn from(err: anyhow::Error) -> Self { - Self::Other(err) - } -} - -impl From for anyhow::Error { - fn from(err: UpdateError) -> Self { - match err { - UpdateError::SocketClosed(err) => anyhow::anyhow!("socket closed, reason {:?}", err), - UpdateError::UnsupportedMessage => anyhow::anyhow!("unsupported message"), - UpdateError::DeserializeError(err) => err, - UpdateError::Other(err) => err, - } - } -} - -async fn update_cache( - socket: &mut Socket, - cache: Arc>>, -) -> Result<(), UpdateError> { - while let Some(msg) = socket.next().await { - let msg = msg.context("websocket error")?; - let text = match msg { - Message::Text(text) => text, - Message::Close(frame) => { - return Err(UpdateError::SocketClosed( - frame.map(|frame| anyhow!(frame.reason)), - )) - } - Message::Ping(payload) => { - // send pong message to keep the connection alive - socket - .send(Message::Pong(payload)) - .await - .context("ping pong failure")?; - continue; - } - _ => { - tracing::debug!("Received unsupported message {:?}", msg); - return Err(UpdateError::UnsupportedMessage); - } - }; - - let records = serde_json::from_str::(&text) - .with_context(|| format!("deserialization error, text received: {}", text)) - .map_err(UpdateError::DeserializeError)? - .payload; - - let mut cache = cache.lock().await; - for record in records { - match record.metadata.state { - OrderState::Added | OrderState::Updated | OrderState::Fillable => { - cache.insert(hex::encode(record.metadata.order_hash.clone()), record); - } - OrderState::Expired => { - cache.remove(&hex::encode(record.metadata.order_hash)); - } - } - } - } - - Ok(()) -} - #[async_trait::async_trait] impl LiquidityCollecting for ZeroExLiquidity { async fn get_liquidity( @@ -288,13 +334,7 @@ impl LiquidityCollecting for ZeroExLiquidity { pairs: HashSet, _block: Block, ) -> Result> { - let zeroex_orders = self - .cache - .lock() - .await - .values() - .cloned() - .collect::>(); + let zeroex_orders = self.cache.orders().await; tracing::debug!("Fetched {} orders from 0x", zeroex_orders.len()); let order_buckets = generate_order_buckets(zeroex_orders.into_iter(), pairs); @@ -619,46 +659,25 @@ pub mod tests { #[tokio::test] #[ignore] async fn connect_and_update_cache_test() { - let cache: Arc>> = Default::default(); - let inner = cache.clone(); - let api = Arc::new(DefaultZeroExApi::test()); - let sender = { let transport = create_env_test_transport(); let web3 = Web3::new(transport); GPv2Settlement::deployed(&web3).await.unwrap().address() }; + let cache = ZeroExCache::new(sender); + let init_api = Arc::new(DefaultZeroExApi::test()); + let update_api = ZeroExWebsocketApi::new(DEFAULT_ZEROEX_WEBSOCKET_API); + let inner = cache.clone(); tokio::task::spawn(async move { - let mut backoff = Duration::from_secs(3); - - loop { - // initialize cache by fetching all existing orders via http api - if let Err(err) = init_cache(api.clone(), inner.clone(), sender).await { - println!( - "error initializing cache: {}, reconnecting with backoff {:?}", - err, backoff - ); - tokio::time::sleep(backoff).await; - backoff *= 2; - continue; - } - // from now on, rely on websocket connection to do incremental updates - if let Err(err) = connect_and_update_cache(inner.clone()).await { - println!("Error updating 0x cache, reconnecting... {:?}", err); - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - } - } + inner.update(init_api, update_api).await; }); // read cache from outside loop { tokio::time::sleep(Duration::from_secs(3)).await; - let cache_size = { - let cache = cache.lock().await; - cache.len() - }; + let cache_size = cache.orders().await.len(); println!("reader size: {}", cache_size); } } diff --git a/crates/solver/src/run.rs b/crates/solver/src/run.rs index b20024b4e6..10f2f8e6ba 100644 --- a/crates/solver/src/run.rs +++ b/crates/solver/src/run.rs @@ -397,6 +397,7 @@ pub async fn run(args: Arguments) { zeroex_api, contracts::IZeroEx::deployed(&web3).await.unwrap(), settlement_contract.clone(), + args.shared.zeroex_ws_url.unwrap_or_default(), ))); }