From 7c28327564eb6141222b69314755721ea75221cf Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 23 Jan 2024 18:05:44 +0100 Subject: [PATCH 01/10] initial commit --- Cargo.lock | 41 ++++++++++++++ subxt/Cargo.toml | 8 ++- subxt/examples/reconnecting_rpc_client.rs | 66 +++++++++++++++++++++++ subxt/src/backend/rpc/mod.rs | 3 +- subxt/src/backend/rpc/rpc_client_t.rs | 48 +++++++++++++++++ subxt/src/error/mod.rs | 3 ++ 6 files changed, 166 insertions(+), 3 deletions(-) create mode 100644 subxt/examples/reconnecting_rpc_client.rs diff --git a/Cargo.lock b/Cargo.lock index 153a437671..655fd48aaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2251,6 +2251,7 @@ dependencies = [ "jsonrpsee-core", "jsonrpsee-http-client", "jsonrpsee-types", + "jsonrpsee-ws-client", ] [[package]] @@ -2334,6 +2335,19 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jsonrpsee-ws-client" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "073c077471e89c4b511fa88b3df9a0f0abdf4a0a2e6683dd2ab36893af87bb2d" +dependencies = [ + "http", + "jsonrpsee-client-transport", + "jsonrpsee-core", + "jsonrpsee-types", + "url", +] + [[package]] name = "keccak" version = "0.1.4" @@ -3117,6 +3131,21 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "reconnecting-jsonrpsee-ws-client" +version = "0.1.0" +source = "git+https://github.com/niklasad1/reconnecting-jsonrpsee-ws-client#2ba3e575e7d3da24fafa3abcde46e9d42975ddd7" +dependencies = [ + "futures", + "jsonrpsee", + "serde_json", + "thiserror", + "tokio", + "tokio-retry", + "tokio-stream", + "tracing", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -4433,6 +4462,7 @@ dependencies = [ "jsonrpsee", "parity-scale-codec", "primitive-types", + "reconnecting-jsonrpsee-ws-client", "scale-bits", "scale-decode", "scale-encode", @@ -4727,6 +4757,17 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 0686a8c897..0bf92e97d6 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -20,7 +20,7 @@ workspace = true [features] # For dev and documentation reasons we enable more features than are often desired. # it's recommended to use `--no-default-features` and then select what you need. -default = ["jsonrpsee", "native"] +default = ["jsonrpsee", "native", "reconnecting-rpc-client"] # Enable this for native (ie non web/wasm builds). # Exactly 1 of "web" and "native" is expected. @@ -42,6 +42,9 @@ web = [ "instant/wasm-bindgen" ] +# Enable this to use the reconnecting rpc client +reconnecting-rpc-client = ["dep:reconnecting-jsonrpsee-ws-client"] + # Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`). jsonrpsee = ["dep:jsonrpsee"] @@ -103,6 +106,9 @@ subxt-lightclient = { workspace = true, optional = true, default-features = fals # Light client support: tokio-stream = { workspace = true, optional = true } +# Reconnecting jsonrpc client +reconnecting-jsonrpsee-ws-client = { git = "https://github.com/niklasad1/reconnecting-jsonrpsee-ws-client", optional = true } + # For parsing urls to disallow insecure schemes url = { workspace = true } diff --git a/subxt/examples/reconnecting_rpc_client.rs b/subxt/examples/reconnecting_rpc_client.rs new file mode 100644 index 0000000000..51c6050400 --- /dev/null +++ b/subxt/examples/reconnecting_rpc_client.rs @@ -0,0 +1,66 @@ +#![allow(missing_docs)] + +use std::time::Duration; + +use reconnecting_jsonrpsee_ws_client::{Client, ExponentialBackoff, PingConfig}; +use subxt::backend::rpc::RpcClient; +use subxt::{OnlineClient, PolkadotConfig}; +use subxt_signer::sr25519::dev; + +// Generate an interface that we can use from the node's metadata. +#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")] +pub mod polkadot {} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create a new client with with a reconnecting RPC client. + let rpc = Client::builder() + // Reconnect with exponential backoff. + .retry_policy(ExponentialBackoff::from_millis(100)) + // Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds + // then disconnect. + // + // This is just a way to ensure that the connection isn't idle if no message is sent that often + .enable_ws_ping( + PingConfig::new() + .ping_interval(Duration::from_secs(6)) + .inactive_limit(Duration::from_secs(30)), + ) + // There are other configurations as well that can be found here: + // + .build("ws://localhost:9944".to_string()) + .await?; + + let api: OnlineClient = + OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?; + + // Build a balance transfer extrinsic. + let dest = dev::bob().public_key().into(); + let balance_transfer_tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000); + + // Submit the balance transfer extrinsic from Alice, and wait for it to be successful + // and in a finalized block. We get back the extrinsic events if all is well. + let from = dev::alice(); + + // When the connection is lost, a error RpcError::DisconnectWillReconnect is emitted on the stream. + // in such scenarios the error will be seen here. + // + // In such scenario it's possible that messages are lost when reconnecting + // and if that's acceptable you may just ignore that error message. + let events = api + .tx() + .sign_and_submit_then_watch_default(&balance_transfer_tx, &from) + .await? + .wait_for_finalized_success() + .await?; + + // Find a Transfer event and print it. + let transfer_event = events.find_first::()?; + if let Some(event) = transfer_event { + println!("Balance transfer success: {event:?}"); + } + + println!("rpc client reconnected: {}", rpc.reconnect_count()); + + Ok(()) +} diff --git a/subxt/src/backend/rpc/mod.rs b/subxt/src/backend/rpc/mod.rs index 12910939e7..094a22d326 100644 --- a/subxt/src/backend/rpc/mod.rs +++ b/subxt/src/backend/rpc/mod.rs @@ -63,6 +63,5 @@ crate::macros::cfg_jsonrpsee! { mod rpc_client; mod rpc_client_t; -pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT}; - pub use rpc_client::{rpc_params, RpcClient, RpcParams, RpcSubscription}; +pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT}; diff --git a/subxt/src/backend/rpc/rpc_client_t.rs b/subxt/src/backend/rpc/rpc_client_t.rs index 98d349d17c..096b1ad71e 100644 --- a/subxt/src/backend/rpc/rpc_client_t.rs +++ b/subxt/src/backend/rpc/rpc_client_t.rs @@ -101,3 +101,51 @@ impl RpcClientT for Box { (**self).subscribe_raw(sub, params, unsub) } } + +#[cfg(feature = "reconnecting-rpc-client")] +impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> RawRpcFuture<'a, Box> { + use futures::FutureExt; + use reconnecting_jsonrpsee_ws_client::RpcParams; + + async { + self.request_raw(method.to_string(), RpcParams::new(params)) + .await + .map_err(|e| RpcError::ClientError(Box::new(e))) + } + .boxed() + } + + fn subscribe_raw<'a>( + &'a self, + sub: &'a str, + params: Option>, + unsub: &'a str, + ) -> RawRpcFuture<'a, RawRpcSubscription> { + use futures::{FutureExt, StreamExt, TryStreamExt}; + use reconnecting_jsonrpsee_ws_client::{RpcParams, SubscriptionId}; + + async { + let sub = self + .subscribe_raw(sub.to_string(), RpcParams::new(params), unsub.to_string()) + .await + .map_err(|e| RpcError::ClientError(Box::new(e)))?; + + let id = match sub.id() { + SubscriptionId::Num(n) => n.to_string(), + SubscriptionId::Str(s) => s.to_string(), + }; + let stream = sub.map_err(|_| RpcError::DisconnectedWillReconnect).boxed(); + + Ok(RawRpcSubscription { + stream, + id: Some(id), + }) + } + .boxed() + } +} diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index 78c5528012..10f635c365 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -120,6 +120,9 @@ pub enum RpcError { /// The requested URL is insecure. #[error("RPC error: insecure URL: {0}")] InsecureUrl(String), + /// The connection was lost and automatically reconnected. + #[error("RPC error: the connection was lost and automatically reconnected")] + DisconnectedWillReconnect, } impl RpcError { From bafcb78376fa2400b7bae0232193ef564b64ebf3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 24 Jan 2024 18:06:14 +0100 Subject: [PATCH 02/10] update to reconnecting-ws-client v0.2 --- Cargo.lock | 5 +++-- subxt/Cargo.toml | 2 +- subxt/src/backend/rpc/rpc_client_t.rs | 7 +++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 655fd48aaa..60c55c06e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3133,8 +3133,9 @@ dependencies = [ [[package]] name = "reconnecting-jsonrpsee-ws-client" -version = "0.1.0" -source = "git+https://github.com/niklasad1/reconnecting-jsonrpsee-ws-client#2ba3e575e7d3da24fafa3abcde46e9d42975ddd7" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7265f5d33f189e19e7242325ff6874e540aa9c46ae8f44a52412fb59faafbe00" dependencies = [ "futures", "jsonrpsee", diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 0bf92e97d6..5ab0f49d53 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -107,7 +107,7 @@ subxt-lightclient = { workspace = true, optional = true, default-features = fals tokio-stream = { workspace = true, optional = true } # Reconnecting jsonrpc client -reconnecting-jsonrpsee-ws-client = { git = "https://github.com/niklasad1/reconnecting-jsonrpsee-ws-client", optional = true } +reconnecting-jsonrpsee-ws-client = { version = "0.2", optional = true } # For parsing urls to disallow insecure schemes url = { workspace = true } diff --git a/subxt/src/backend/rpc/rpc_client_t.rs b/subxt/src/backend/rpc/rpc_client_t.rs index 096b1ad71e..908f203482 100644 --- a/subxt/src/backend/rpc/rpc_client_t.rs +++ b/subxt/src/backend/rpc/rpc_client_t.rs @@ -110,10 +110,9 @@ impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { params: Option>, ) -> RawRpcFuture<'a, Box> { use futures::FutureExt; - use reconnecting_jsonrpsee_ws_client::RpcParams; async { - self.request_raw(method.to_string(), RpcParams::new(params)) + self.request_raw(method.to_string(), params) .await .map_err(|e| RpcError::ClientError(Box::new(e))) } @@ -127,11 +126,11 @@ impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { unsub: &'a str, ) -> RawRpcFuture<'a, RawRpcSubscription> { use futures::{FutureExt, StreamExt, TryStreamExt}; - use reconnecting_jsonrpsee_ws_client::{RpcParams, SubscriptionId}; + use reconnecting_jsonrpsee_ws_client::SubscriptionId; async { let sub = self - .subscribe_raw(sub.to_string(), RpcParams::new(params), unsub.to_string()) + .subscribe_raw(sub.to_string(), params, unsub.to_string()) .await .map_err(|e| RpcError::ClientError(Box::new(e)))?; From eb7fcd3ff443b1c01b4ceb0ccf8414127d31591e Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 25 Jan 2024 15:16:59 +0100 Subject: [PATCH 03/10] re-export: reconnecting_rpc_client behind feature --- subxt/Cargo.toml | 9 ++- subxt/examples/reconnecting_rpc_client.rs | 68 ++++++++++++++++++----- subxt/src/backend/rpc/mod.rs | 4 ++ subxt/src/backend/rpc/rpc_client_t.rs | 48 +++++++++++++++- subxt/src/error/mod.rs | 6 +- subxt/src/macros.rs | 16 +++++- 6 files changed, 128 insertions(+), 23 deletions(-) diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 5ab0f49d53..7a8796a03d 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -20,7 +20,7 @@ workspace = true [features] # For dev and documentation reasons we enable more features than are often desired. # it's recommended to use `--no-default-features` and then select what you need. -default = ["jsonrpsee", "native", "reconnecting-rpc-client"] +default = ["jsonrpsee", "native"] # Enable this for native (ie non web/wasm builds). # Exactly 1 of "web" and "native" is expected. @@ -106,7 +106,7 @@ subxt-lightclient = { workspace = true, optional = true, default-features = fals # Light client support: tokio-stream = { workspace = true, optional = true } -# Reconnecting jsonrpc client +# Reconnecting jsonrpc ws client reconnecting-jsonrpsee-ws-client = { version = "0.2", optional = true } # For parsing urls to disallow insecure schemes @@ -144,6 +144,11 @@ name = "light_client_parachains" path = "examples/light_client_parachains.rs" required-features = ["unstable-light-client", "jsonrpsee", "native"] +[[example]] +name = "reconnecting_rpc_client" +path = "examples/reconnecting_rpc_client.rs" +required-features = ["reconnecting-rpc-client"] + [package.metadata.docs.rs] features = ["default", "substrate-compat", "unstable-light-client"] rustdoc-args = ["--cfg", "docsrs"] diff --git a/subxt/examples/reconnecting_rpc_client.rs b/subxt/examples/reconnecting_rpc_client.rs index 51c6050400..f85fea2ff9 100644 --- a/subxt/examples/reconnecting_rpc_client.rs +++ b/subxt/examples/reconnecting_rpc_client.rs @@ -1,10 +1,17 @@ +//! Example to utilize the `reconnecting rpc client` in subxt +//! which hidden behind behind `--feature reconnecting-rpc-client` +//! +//! To utilize full logs from the RPC client use: +//! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"` + #![allow(missing_docs)] use std::time::Duration; -use reconnecting_jsonrpsee_ws_client::{Client, ExponentialBackoff, PingConfig}; +use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig}; use subxt::backend::rpc::RpcClient; -use subxt::{OnlineClient, PolkadotConfig}; +use subxt::error::{Error, RpcError}; +use subxt::{tx::TxStatus, OnlineClient, PolkadotConfig}; use subxt_signer::sr25519::dev; // Generate an interface that we can use from the node's metadata. @@ -13,10 +20,12 @@ pub mod polkadot {} #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + // Create a new client with with a reconnecting RPC client. let rpc = Client::builder() // Reconnect with exponential backoff. - .retry_policy(ExponentialBackoff::from_millis(100)) + .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(60))) // Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds // then disconnect. // @@ -42,25 +51,56 @@ async fn main() -> Result<(), Box> { // and in a finalized block. We get back the extrinsic events if all is well. let from = dev::alice(); + let mut balance_transfer_progress = api + .tx() + .sign_and_submit_then_watch_default(&balance_transfer_tx, &from) + .await?; + // When the connection is lost, a error RpcError::DisconnectWillReconnect is emitted on the stream. // in such scenarios the error will be seen here. // // In such scenario it's possible that messages are lost when reconnecting // and if that's acceptable you may just ignore that error message. - let events = api - .tx() - .sign_and_submit_then_watch_default(&balance_transfer_tx, &from) - .await? - .wait_for_finalized_success() - .await?; + while let Some(status) = balance_transfer_progress.next().await { + match status { + // It's finalized in a block! + Ok(TxStatus::InFinalizedBlock(in_block)) => { + println!( + "Transaction {:?} is finalized in block {:?}", + in_block.extrinsic_hash(), + in_block.block_hash() + ); + + // grab the events and fail if no ExtrinsicSuccess event seen: + let events = in_block.wait_for_success().await?; + // We can look for events (this uses the static interface; we can also iterate + // over them and dynamically decode them): + let transfer_event = events.find_first::()?; - // Find a Transfer event and print it. - let transfer_event = events.find_first::()?; - if let Some(event) = transfer_event { - println!("Balance transfer success: {event:?}"); + if let Some(event) = transfer_event { + println!("Balance transfer success: {event:?}"); + } else { + println!("Failed to find Balances::Transfer Event"); + } + } + // Just log any other status we encounter: + Ok(other) => { + println!("Status: {other:?}"); + } + // In this example we just ignore when reconnections occurs + // but it's technically possible that we can lose + // messages on the subscription such as `InFinalizedBlock` + // when reconnecting. + Err(Error::Rpc(RpcError::DisconnectedWillReconnect(e))) => { + println!("{:?}", e); + } + Err(err) => { + return Err(err.into()); + } + } } - println!("rpc client reconnected: {}", rpc.reconnect_count()); + println!("RPC client reconnected `{}` times", rpc.reconnect_count()); Ok(()) } diff --git a/subxt/src/backend/rpc/mod.rs b/subxt/src/backend/rpc/mod.rs index 094a22d326..27d4405607 100644 --- a/subxt/src/backend/rpc/mod.rs +++ b/subxt/src/backend/rpc/mod.rs @@ -60,6 +60,10 @@ crate::macros::cfg_jsonrpsee! { mod jsonrpsee_impl; } +crate::macros::cfg_reconnecting_rpc_client! { + pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client; +} + mod rpc_client; mod rpc_client_t; diff --git a/subxt/src/backend/rpc/rpc_client_t.rs b/subxt/src/backend/rpc/rpc_client_t.rs index 908f203482..fcb46ab0ac 100644 --- a/subxt/src/backend/rpc/rpc_client_t.rs +++ b/subxt/src/backend/rpc/rpc_client_t.rs @@ -126,7 +126,49 @@ impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { unsub: &'a str, ) -> RawRpcFuture<'a, RawRpcSubscription> { use futures::{FutureExt, StreamExt, TryStreamExt}; - use reconnecting_jsonrpsee_ws_client::SubscriptionId; + use reconnecting_jsonrpsee_ws_client::{RpcError as ReconnError, SubscriptionId}; + + // Helper for more human friendly error messages. + // Workaround for: https://github.com/paritytech/jsonrpsee/issues/1280 + fn rpc_error(err: ReconnError) -> String { + match err { + ReconnError::Transport(e) => e.to_string(), + e @ ReconnError::InvalidSubscriptionId => e.to_string(), + ReconnError::InvalidRequestId(e) => e.to_string(), + ReconnError::Custom(e) => e, + ReconnError::RestartNeeded(inner_err) => { + // HACK: jsonrpsee has Error::RestartNeeded(Arc) and + // we need to visit one child because + // RestartNeeded only contains one layer of recursion. + // + // Thus, `RestartNeeded(RestartNeededArc)` + // and is unreachable. + match &*inner_err { + ReconnError::Transport(e) => e.to_string(), + e @ ReconnError::InvalidSubscriptionId => e.to_string(), + ReconnError::InvalidRequestId(e) => e.to_string(), + ReconnError::Custom(e) => e.to_string(), + // These variants are infallible. + ReconnError::RestartNeeded(_) + | ReconnError::RequestTimeout + | ReconnError::MaxSlotsExceeded + | ReconnError::Call(_) + | ReconnError::ParseError(_) + | ReconnError::HttpNotImplemented + | ReconnError::EmptyBatchRequest(_) + | ReconnError::RegisterMethod(_) => unreachable!(), + } + } + // These variants are infallible. + ReconnError::RequestTimeout + | ReconnError::MaxSlotsExceeded + | ReconnError::Call(_) + | ReconnError::ParseError(_) + | ReconnError::HttpNotImplemented + | ReconnError::EmptyBatchRequest(_) + | ReconnError::RegisterMethod(_) => unreachable!(), + } + } async { let sub = self @@ -138,7 +180,9 @@ impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { SubscriptionId::Num(n) => n.to_string(), SubscriptionId::Str(s) => s.to_string(), }; - let stream = sub.map_err(|_| RpcError::DisconnectedWillReconnect).boxed(); + let stream = sub + .map_err(|e| RpcError::DisconnectedWillReconnect(Some(rpc_error(e.0)))) + .boxed(); Ok(RawRpcSubscription { stream, diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index 10f635c365..01d1473158 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -6,8 +6,6 @@ mod dispatch_error; -use core::fmt::Debug; - crate::macros::cfg_unstable_light_client! { pub use crate::client::LightClientError; } @@ -121,8 +119,8 @@ pub enum RpcError { #[error("RPC error: insecure URL: {0}")] InsecureUrl(String), /// The connection was lost and automatically reconnected. - #[error("RPC error: the connection was lost and automatically reconnected")] - DisconnectedWillReconnect, + #[error("RPC error: the connection was lost `{0:?}`; reconnect automatically initiated")] + DisconnectedWillReconnect(Option), } impl RpcError { diff --git a/subxt/src/macros.rs b/subxt/src/macros.rs index a8d86ada69..73cd89780c 100644 --- a/subxt/src/macros.rs +++ b/subxt/src/macros.rs @@ -52,7 +52,21 @@ macro_rules! cfg_jsonrpsee_web { } } -pub(crate) use {cfg_feature, cfg_jsonrpsee, cfg_substrate_compat, cfg_unstable_light_client}; +#[allow(unused)] +macro_rules! cfg_reconnecting_rpc_client { + ($($item:item)*) => { + $( + #[cfg(all(feature = "reconnecting-rpc-client"))] + #[cfg_attr(docsrs, doc(cfg(feature = "reconnecting-rpc-client")))] + $item + )* + } +} + +pub(crate) use { + cfg_feature, cfg_jsonrpsee, cfg_reconnecting_rpc_client, cfg_substrate_compat, + cfg_unstable_light_client, +}; // Only used by light-client. #[allow(unused)] From cf888725eea25690e64425300a15901ed3dee3fe Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 25 Jan 2024 15:44:58 +0100 Subject: [PATCH 04/10] add helper function for reconnect --- subxt/examples/reconnecting_rpc_client.rs | 21 +++++++++------------ subxt/src/error/mod.rs | 7 +++++++ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/subxt/examples/reconnecting_rpc_client.rs b/subxt/examples/reconnecting_rpc_client.rs index f85fea2ff9..3665525bbf 100644 --- a/subxt/examples/reconnecting_rpc_client.rs +++ b/subxt/examples/reconnecting_rpc_client.rs @@ -10,7 +10,7 @@ use std::time::Duration; use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig}; use subxt::backend::rpc::RpcClient; -use subxt::error::{Error, RpcError}; +use subxt::error::{Error, RpcError, TransactionError}; use subxt::{tx::TxStatus, OnlineClient, PolkadotConfig}; use subxt_signer::sr25519::dev; @@ -25,7 +25,7 @@ async fn main() -> Result<(), Box> { // Create a new client with with a reconnecting RPC client. let rpc = Client::builder() // Reconnect with exponential backoff. - .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(60))) + .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) // Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds // then disconnect. // @@ -65,27 +65,24 @@ async fn main() -> Result<(), Box> { match status { // It's finalized in a block! Ok(TxStatus::InFinalizedBlock(in_block)) => { - println!( - "Transaction {:?} is finalized in block {:?}", - in_block.extrinsic_hash(), - in_block.block_hash() - ); - // grab the events and fail if no ExtrinsicSuccess event seen: let events = in_block.wait_for_success().await?; // We can look for events (this uses the static interface; we can also iterate // over them and dynamically decode them): let transfer_event = events.find_first::()?; - if let Some(event) = transfer_event { - println!("Balance transfer success: {event:?}"); + if transfer_event.is_some() { + println!("Balance transfer success"); } else { println!("Failed to find Balances::Transfer Event"); } } // Just log any other status we encounter: - Ok(other) => { - println!("Status: {other:?}"); + // + // In this example we emit some important status handling for + // here such as Dropped, Invalid etc.... + Ok(_) => { + println!("New status"); } // In this example we just ignore when reconnections occurs // but it's technically possible that we can lose diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index 01d1473158..518dd36a66 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -98,6 +98,13 @@ impl From for Error { } } +impl Error { + /// Checks whether the error was caused by a RPC re-connection. + pub fn is_disconnected_will_reconnect(&self) -> bool { + matches!(self, Error::Rpc(RpcError::DisconnectedWillReconnect(_))) + } +} + /// An RPC error. Since we are generic over the RPC client that is used, /// the error is boxed and could be casted. #[derive(Debug, thiserror::Error)] From b85a5477cce9e68d807e553d04cbdefc7370f26f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 25 Jan 2024 16:47:56 +0100 Subject: [PATCH 05/10] fix nit in example --- subxt/examples/reconnecting_rpc_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/examples/reconnecting_rpc_client.rs b/subxt/examples/reconnecting_rpc_client.rs index 3665525bbf..31413fec74 100644 --- a/subxt/examples/reconnecting_rpc_client.rs +++ b/subxt/examples/reconnecting_rpc_client.rs @@ -10,7 +10,7 @@ use std::time::Duration; use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig}; use subxt::backend::rpc::RpcClient; -use subxt::error::{Error, RpcError, TransactionError}; +use subxt::error::{Error, RpcError}; use subxt::{tx::TxStatus, OnlineClient, PolkadotConfig}; use subxt_signer::sr25519::dev; From 282f86adb713659bb636377a7e46017062508b39 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 26 Jan 2024 17:13:49 +0100 Subject: [PATCH 06/10] simplify code without weird error fmt --- Cargo.lock | 2 +- subxt/examples/reconnecting_rpc_client.rs | 13 ++++--- subxt/src/backend/rpc/rpc_client_t.rs | 46 +---------------------- subxt/src/error/mod.rs | 4 +- 4 files changed, 13 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 546dde6432..4933d578fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4790,7 +4790,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" dependencies = [ "pin-project", - "rand 0.8.5", + "rand", "tokio", ] diff --git a/subxt/examples/reconnecting_rpc_client.rs b/subxt/examples/reconnecting_rpc_client.rs index 31413fec74..f61b2ea842 100644 --- a/subxt/examples/reconnecting_rpc_client.rs +++ b/subxt/examples/reconnecting_rpc_client.rs @@ -24,7 +24,10 @@ async fn main() -> Result<(), Box> { // Create a new client with with a reconnecting RPC client. let rpc = Client::builder() - // Reconnect with exponential backoff. + // Reconnect with exponential backoff + // + // This API is "iterator-like" so one could limit it to only + // reconnect x times and then quit. .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) // Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds // then disconnect. @@ -57,10 +60,10 @@ async fn main() -> Result<(), Box> { .await?; // When the connection is lost, a error RpcError::DisconnectWillReconnect is emitted on the stream. - // in such scenarios the error will be seen here. + // In this example we just ignore it and print the why it was disconnected. // - // In such scenario it's possible that messages are lost when reconnecting - // and if that's acceptable you may just ignore that error message. + // If you want to quit on reconnects just use `balance_transfer_progress.wait_for_finalized_success().await` + // and remove the while loop below. while let Some(status) = balance_transfer_progress.next().await { match status { // It's finalized in a block! @@ -89,7 +92,7 @@ async fn main() -> Result<(), Box> { // messages on the subscription such as `InFinalizedBlock` // when reconnecting. Err(Error::Rpc(RpcError::DisconnectedWillReconnect(e))) => { - println!("{:?}", e); + println!("{e}"); } Err(err) => { return Err(err.into()); diff --git a/subxt/src/backend/rpc/rpc_client_t.rs b/subxt/src/backend/rpc/rpc_client_t.rs index fcb46ab0ac..09238e0c93 100644 --- a/subxt/src/backend/rpc/rpc_client_t.rs +++ b/subxt/src/backend/rpc/rpc_client_t.rs @@ -126,49 +126,7 @@ impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { unsub: &'a str, ) -> RawRpcFuture<'a, RawRpcSubscription> { use futures::{FutureExt, StreamExt, TryStreamExt}; - use reconnecting_jsonrpsee_ws_client::{RpcError as ReconnError, SubscriptionId}; - - // Helper for more human friendly error messages. - // Workaround for: https://github.com/paritytech/jsonrpsee/issues/1280 - fn rpc_error(err: ReconnError) -> String { - match err { - ReconnError::Transport(e) => e.to_string(), - e @ ReconnError::InvalidSubscriptionId => e.to_string(), - ReconnError::InvalidRequestId(e) => e.to_string(), - ReconnError::Custom(e) => e, - ReconnError::RestartNeeded(inner_err) => { - // HACK: jsonrpsee has Error::RestartNeeded(Arc) and - // we need to visit one child because - // RestartNeeded only contains one layer of recursion. - // - // Thus, `RestartNeeded(RestartNeededArc)` - // and is unreachable. - match &*inner_err { - ReconnError::Transport(e) => e.to_string(), - e @ ReconnError::InvalidSubscriptionId => e.to_string(), - ReconnError::InvalidRequestId(e) => e.to_string(), - ReconnError::Custom(e) => e.to_string(), - // These variants are infallible. - ReconnError::RestartNeeded(_) - | ReconnError::RequestTimeout - | ReconnError::MaxSlotsExceeded - | ReconnError::Call(_) - | ReconnError::ParseError(_) - | ReconnError::HttpNotImplemented - | ReconnError::EmptyBatchRequest(_) - | ReconnError::RegisterMethod(_) => unreachable!(), - } - } - // These variants are infallible. - ReconnError::RequestTimeout - | ReconnError::MaxSlotsExceeded - | ReconnError::Call(_) - | ReconnError::ParseError(_) - | ReconnError::HttpNotImplemented - | ReconnError::EmptyBatchRequest(_) - | ReconnError::RegisterMethod(_) => unreachable!(), - } - } + use reconnecting_jsonrpsee_ws_client::SubscriptionId; async { let sub = self @@ -181,7 +139,7 @@ impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { SubscriptionId::Str(s) => s.to_string(), }; let stream = sub - .map_err(|e| RpcError::DisconnectedWillReconnect(Some(rpc_error(e.0)))) + .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) .boxed(); Ok(RawRpcSubscription { diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index 518dd36a66..41dbc11d31 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -126,8 +126,8 @@ pub enum RpcError { #[error("RPC error: insecure URL: {0}")] InsecureUrl(String), /// The connection was lost and automatically reconnected. - #[error("RPC error: the connection was lost `{0:?}`; reconnect automatically initiated")] - DisconnectedWillReconnect(Option), + #[error("RPC error: the connection was lost `{0}`; reconnect automatically initiated")] + DisconnectedWillReconnect(String), } impl RpcError { From 57141cd9e2307b1a9abc1b8126afbdf38756b401 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 26 Jan 2024 17:24:50 +0100 Subject: [PATCH 07/10] address grumbles --- subxt/src/backend/rpc/mod.rs | 1 + .../rpc/reconnecting_jsonrpsee_impl.rs | 52 +++++++++++++++++++ subxt/src/backend/rpc/rpc_client_t.rs | 49 ----------------- 3 files changed, 53 insertions(+), 49 deletions(-) create mode 100644 subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs diff --git a/subxt/src/backend/rpc/mod.rs b/subxt/src/backend/rpc/mod.rs index 27d4405607..19101dd5df 100644 --- a/subxt/src/backend/rpc/mod.rs +++ b/subxt/src/backend/rpc/mod.rs @@ -61,6 +61,7 @@ crate::macros::cfg_jsonrpsee! { } crate::macros::cfg_reconnecting_rpc_client! { + mod reconnecting_jsonrpsee_impl; pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client; } diff --git a/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs b/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs new file mode 100644 index 0000000000..da37b267e0 --- /dev/null +++ b/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs @@ -0,0 +1,52 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use super::{RawRpcFuture, RawRpcSubscription, RpcClientT}; +use crate::error::RpcError; +use futures::{FutureExt, StreamExt, TryStreamExt}; +use reconnecting_jsonrpsee_ws_client::SubscriptionId; +use serde_json::value::RawValue; + +impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> RawRpcFuture<'a, Box> { + async { + self.request_raw(method.to_string(), params) + .await + .map_err(|e| RpcError::ClientError(Box::new(e))) + } + .boxed() + } + + fn subscribe_raw<'a>( + &'a self, + sub: &'a str, + params: Option>, + unsub: &'a str, + ) -> RawRpcFuture<'a, RawRpcSubscription> { + async { + let sub = self + .subscribe_raw(sub.to_string(), params, unsub.to_string()) + .await + .map_err(|e| RpcError::ClientError(Box::new(e)))?; + + let id = match sub.id() { + SubscriptionId::Num(n) => n.to_string(), + SubscriptionId::Str(s) => s.to_string(), + }; + let stream = sub + .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) + .boxed(); + + Ok(RawRpcSubscription { + stream, + id: Some(id), + }) + } + .boxed() + } +} diff --git a/subxt/src/backend/rpc/rpc_client_t.rs b/subxt/src/backend/rpc/rpc_client_t.rs index 09238e0c93..98d349d17c 100644 --- a/subxt/src/backend/rpc/rpc_client_t.rs +++ b/subxt/src/backend/rpc/rpc_client_t.rs @@ -101,52 +101,3 @@ impl RpcClientT for Box { (**self).subscribe_raw(sub, params, unsub) } } - -#[cfg(feature = "reconnecting-rpc-client")] -impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { - fn request_raw<'a>( - &'a self, - method: &'a str, - params: Option>, - ) -> RawRpcFuture<'a, Box> { - use futures::FutureExt; - - async { - self.request_raw(method.to_string(), params) - .await - .map_err(|e| RpcError::ClientError(Box::new(e))) - } - .boxed() - } - - fn subscribe_raw<'a>( - &'a self, - sub: &'a str, - params: Option>, - unsub: &'a str, - ) -> RawRpcFuture<'a, RawRpcSubscription> { - use futures::{FutureExt, StreamExt, TryStreamExt}; - use reconnecting_jsonrpsee_ws_client::SubscriptionId; - - async { - let sub = self - .subscribe_raw(sub.to_string(), params, unsub.to_string()) - .await - .map_err(|e| RpcError::ClientError(Box::new(e)))?; - - let id = match sub.id() { - SubscriptionId::Num(n) => n.to_string(), - SubscriptionId::Str(s) => s.to_string(), - }; - let stream = sub - .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) - .boxed(); - - Ok(RawRpcSubscription { - stream, - id: Some(id), - }) - } - .boxed() - } -} From c348bb0e117238d9f192e7b439182bad5dd0a1da Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 1 Feb 2024 18:33:56 +0100 Subject: [PATCH 08/10] address grumbles --- subxt/Cargo.toml | 4 +- subxt/examples/reconnecting_rpc_client.rs | 67 ++++++----------------- subxt/src/macros.rs | 4 +- 3 files changed, 21 insertions(+), 54 deletions(-) diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 7a8796a03d..84134e664f 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -43,7 +43,7 @@ web = [ ] # Enable this to use the reconnecting rpc client -reconnecting-rpc-client = ["dep:reconnecting-jsonrpsee-ws-client"] +unstable-reconnecting-rpc-client = ["dep:reconnecting-jsonrpsee-ws-client"] # Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`). jsonrpsee = ["dep:jsonrpsee"] @@ -147,7 +147,7 @@ required-features = ["unstable-light-client", "jsonrpsee", "native"] [[example]] name = "reconnecting_rpc_client" path = "examples/reconnecting_rpc_client.rs" -required-features = ["reconnecting-rpc-client"] +required-features = ["unstable-reconnecting-rpc-client"] [package.metadata.docs.rs] features = ["default", "substrate-compat", "unstable-light-client"] diff --git a/subxt/examples/reconnecting_rpc_client.rs b/subxt/examples/reconnecting_rpc_client.rs index f61b2ea842..fcee4e504b 100644 --- a/subxt/examples/reconnecting_rpc_client.rs +++ b/subxt/examples/reconnecting_rpc_client.rs @@ -11,8 +11,7 @@ use std::time::Duration; use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig}; use subxt::backend::rpc::RpcClient; use subxt::error::{Error, RpcError}; -use subxt::{tx::TxStatus, OnlineClient, PolkadotConfig}; -use subxt_signer::sr25519::dev; +use subxt::{OnlineClient, PolkadotConfig}; // Generate an interface that we can use from the node's metadata. #[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")] @@ -46,58 +45,26 @@ async fn main() -> Result<(), Box> { let api: OnlineClient = OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?; - // Build a balance transfer extrinsic. - let dest = dev::bob().public_key().into(); - let balance_transfer_tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000); + // Subscribe to all finalized blocks: + let mut blocks_sub = api.blocks().subscribe_finalized().await?; - // Submit the balance transfer extrinsic from Alice, and wait for it to be successful - // and in a finalized block. We get back the extrinsic events if all is well. - let from = dev::alice(); - - let mut balance_transfer_progress = api - .tx() - .sign_and_submit_then_watch_default(&balance_transfer_tx, &from) - .await?; - - // When the connection is lost, a error RpcError::DisconnectWillReconnect is emitted on the stream. - // In this example we just ignore it and print the why it was disconnected. - // - // If you want to quit on reconnects just use `balance_transfer_progress.wait_for_finalized_success().await` - // and remove the while loop below. - while let Some(status) = balance_transfer_progress.next().await { - match status { - // It's finalized in a block! - Ok(TxStatus::InFinalizedBlock(in_block)) => { - // grab the events and fail if no ExtrinsicSuccess event seen: - let events = in_block.wait_for_success().await?; - // We can look for events (this uses the static interface; we can also iterate - // over them and dynamically decode them): - let transfer_event = events.find_first::()?; - - if transfer_event.is_some() { - println!("Balance transfer success"); - } else { - println!("Failed to find Balances::Transfer Event"); - } - } - // Just log any other status we encounter: - // - // In this example we emit some important status handling for - // here such as Dropped, Invalid etc.... - Ok(_) => { - println!("New status"); - } - // In this example we just ignore when reconnections occurs - // but it's technically possible that we can lose - // messages on the subscription such as `InFinalizedBlock` - // when reconnecting. + // For each block, print a bunch of information about it: + while let Some(block) = blocks_sub.next().await { + let block = match block { + Ok(b) => b, Err(Error::Rpc(RpcError::DisconnectedWillReconnect(e))) => { - println!("{e}"); + println!("The connection was lost: `{}`; reconnecting", e); + continue; } - Err(err) => { - return Err(err.into()); + Err(e) => { + return Err(e.into()); } - } + }; + + let block_number = block.header().number; + let block_hash = block.hash(); + + println!("Block #{block_number} ({block_hash})"); } println!("RPC client reconnected `{}` times", rpc.reconnect_count()); diff --git a/subxt/src/macros.rs b/subxt/src/macros.rs index 73cd89780c..47362f18ca 100644 --- a/subxt/src/macros.rs +++ b/subxt/src/macros.rs @@ -56,8 +56,8 @@ macro_rules! cfg_jsonrpsee_web { macro_rules! cfg_reconnecting_rpc_client { ($($item:item)*) => { $( - #[cfg(all(feature = "reconnecting-rpc-client"))] - #[cfg_attr(docsrs, doc(cfg(feature = "reconnecting-rpc-client")))] + #[cfg(all(feature = "unstable-reconnecting-rpc-client"))] + #[cfg_attr(docsrs, doc(cfg(feature = "unstable-reconnecting-rpc-client")))] $item )* } From 4b6c2969a614b50e2e4176a7a34e82db740fca95 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 8 Feb 2024 11:57:41 +0100 Subject: [PATCH 09/10] update reconnecting-ws-client 0.3 --- Cargo.lock | 105 ++++++++++++++++++++++++++++++++++++++--------- subxt/Cargo.toml | 2 +- 2 files changed, 87 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ae76caa04..b6f2ffb56c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2302,10 +2302,20 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9579d0ca9fb30da026bac2f0f7d9576ec93489aeb7cd4971dd5b4617d82c79b2" dependencies = [ - "jsonrpsee-client-transport", - "jsonrpsee-core", + "jsonrpsee-client-transport 0.21.0", + "jsonrpsee-core 0.21.0", "jsonrpsee-http-client", - "jsonrpsee-types", + "jsonrpsee-types 0.21.0", +] + +[[package]] +name = "jsonrpsee" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a95f7cc23d5fab0cdeeaf6bad8c8f5e7a3aa7f0d211957ea78232b327ab27b0" +dependencies = [ + "jsonrpsee-core 0.22.0", + "jsonrpsee-types 0.22.0", "jsonrpsee-ws-client", ] @@ -2319,7 +2329,28 @@ dependencies = [ "futures-util", "gloo-net", "http", - "jsonrpsee-core", + "jsonrpsee-core 0.21.0", + "pin-project", + "rustls-native-certs 0.7.0", + "rustls-pki-types", + "soketto", + "thiserror", + "tokio", + "tokio-rustls 0.25.0", + "tokio-util", + "tracing", + "url", +] + +[[package]] +name = "jsonrpsee-client-transport" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b1736cfa3845fd9f8f43751f2b8e0e83f7b6081e754502f7d63b6587692cc83" +dependencies = [ + "futures-util", + "http", + "jsonrpsee-core 0.22.0", "pin-project", "rustls-native-certs 0.7.0", "rustls-pki-types", @@ -2345,7 +2376,7 @@ dependencies = [ "futures-timer", "futures-util", "hyper", - "jsonrpsee-types", + "jsonrpsee-types 0.21.0", "pin-project", "rustc-hash", "serde", @@ -2357,6 +2388,29 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "jsonrpsee-core" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82030d038658974732103e623ba2e0abec03bbbe175b39c0a2fafbada60c5868" +dependencies = [ + "anyhow", + "async-lock 3.3.0", + "async-trait", + "beef", + "futures-timer", + "futures-util", + "jsonrpsee-types 0.22.0", + "pin-project", + "rustc-hash", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "jsonrpsee-http-client" version = "0.21.0" @@ -2366,8 +2420,8 @@ dependencies = [ "async-trait", "hyper", "hyper-rustls", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-core 0.21.0", + "jsonrpsee-types 0.21.0", "serde", "serde_json", "thiserror", @@ -2390,16 +2444,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jsonrpsee-types" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a48fdc1202eafc51c63e00406575e59493284ace8b8b61aa16f3a6db5d64f1a" +dependencies = [ + "anyhow", + "beef", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "jsonrpsee-ws-client" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "073c077471e89c4b511fa88b3df9a0f0abdf4a0a2e6683dd2ab36893af87bb2d" +checksum = "c5ce25d70a8e4d3cc574bbc3cad0137c326ad64b194793d5e7bbdd3fa4504181" dependencies = [ "http", - "jsonrpsee-client-transport", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-client-transport 0.22.0", + "jsonrpsee-core 0.22.0", + "jsonrpsee-types 0.22.0", "url", ] @@ -3161,12 +3228,12 @@ dependencies = [ [[package]] name = "reconnecting-jsonrpsee-ws-client" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7265f5d33f189e19e7242325ff6874e540aa9c46ae8f44a52412fb59faafbe00" +checksum = "0ea5cf7b021db88f1af45a9b2ecdbe5bc1c5cbebc146632269d572cdd435f5cf" dependencies = [ "futures", - "jsonrpsee", + "jsonrpsee 0.22.0", "serde_json", "thiserror", "tokio", @@ -4483,7 +4550,7 @@ dependencies = [ "hex", "impl-serde", "instant", - "jsonrpsee", + "jsonrpsee 0.21.0", "parity-scale-codec", "primitive-types", "reconnecting-jsonrpsee-ws-client", @@ -4521,7 +4588,7 @@ dependencies = [ "heck", "hex", "indoc", - "jsonrpsee", + "jsonrpsee 0.21.0", "parity-scale-codec", "pretty_assertions", "quote", @@ -4548,7 +4615,7 @@ dependencies = [ "getrandom", "heck", "hex", - "jsonrpsee", + "jsonrpsee 0.21.0", "parity-scale-codec", "proc-macro2", "quote", @@ -4685,7 +4752,7 @@ version = "0.34.0" dependencies = [ "hex", "impl-serde", - "jsonrpsee", + "jsonrpsee 0.21.0", "parity-scale-codec", "serde", "substrate-runner", diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 84134e664f..5ff125d191 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -107,7 +107,7 @@ subxt-lightclient = { workspace = true, optional = true, default-features = fals tokio-stream = { workspace = true, optional = true } # Reconnecting jsonrpc ws client -reconnecting-jsonrpsee-ws-client = { version = "0.2", optional = true } +reconnecting-jsonrpsee-ws-client = { version = "0.3", optional = true } # For parsing urls to disallow insecure schemes url = { workspace = true } From 3a8b52071646d234624be561655f48f0605da4c3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 8 Feb 2024 12:05:06 +0100 Subject: [PATCH 10/10] cleanup error message --- subxt/examples/reconnecting_rpc_client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/subxt/examples/reconnecting_rpc_client.rs b/subxt/examples/reconnecting_rpc_client.rs index fcee4e504b..b21be899f6 100644 --- a/subxt/examples/reconnecting_rpc_client.rs +++ b/subxt/examples/reconnecting_rpc_client.rs @@ -1,5 +1,5 @@ //! Example to utilize the `reconnecting rpc client` in subxt -//! which hidden behind behind `--feature reconnecting-rpc-client` +//! which hidden behind behind `--feature unstable-reconnecting-rpc-client` //! //! To utilize full logs from the RPC client use: //! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"` @@ -52,8 +52,8 @@ async fn main() -> Result<(), Box> { while let Some(block) = blocks_sub.next().await { let block = match block { Ok(b) => b, - Err(Error::Rpc(RpcError::DisconnectedWillReconnect(e))) => { - println!("The connection was lost: `{}`; reconnecting", e); + Err(Error::Rpc(RpcError::DisconnectedWillReconnect(err))) => { + println!("{err}"); continue; } Err(e) => {