Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add reconnecting-rpc-client #1396

Merged
merged 14 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ web = [
"instant/wasm-bindgen"
]

# Enable this to use the reconnecting rpc client
reconnecting-rpc-client = ["dep:reconnecting-jsonrpsee-ws-client"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we prefix with "unstable" for now until we've settled on the semantics of it (ie probably that the client will reconnect butnot auto resubscribe etc) and handled the DisconnectedWillReconnect?


# Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`).
jsonrpsee = ["dep:jsonrpsee"]

Expand Down Expand Up @@ -103,6 +106,9 @@ subxt-lightclient = { workspace = true, optional = true, default-features = fals
# Light client support:
tokio-stream = { workspace = true, optional = true }

# Reconnecting jsonrpc ws client
reconnecting-jsonrpsee-ws-client = { version = "0.2", optional = true }

# For parsing urls to disallow insecure schemes
url = { workspace = true }

Expand Down Expand Up @@ -138,6 +144,11 @@ name = "light_client_parachains"
path = "examples/light_client_parachains.rs"
required-features = ["unstable-light-client", "jsonrpsee", "native"]

[[example]]
name = "reconnecting_rpc_client"
path = "examples/reconnecting_rpc_client.rs"
required-features = ["reconnecting-rpc-client"]

[package.metadata.docs.rs]
features = ["default", "substrate-compat", "unstable-light-client"]
rustdoc-args = ["--cfg", "docsrs"]
Expand Down
103 changes: 103 additions & 0 deletions subxt/examples/reconnecting_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//! Example to utilize the `reconnecting rpc client` in subxt
//! which hidden behind behind `--feature reconnecting-rpc-client`
//!
//! To utilize full logs from the RPC client use:
//! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"`

#![allow(missing_docs)]

use std::time::Duration;

use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig};
use subxt::backend::rpc::RpcClient;
use subxt::error::{Error, RpcError};
use subxt::{tx::TxStatus, OnlineClient, PolkadotConfig};
use subxt_signer::sr25519::dev;

// Generate an interface that we can use from the node's metadata.
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")]
pub mod polkadot {}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

// Create a new client with with a reconnecting RPC client.
let rpc = Client::builder()
// Reconnect with exponential backoff.
.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
// Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds
// then disconnect.
//
// This is just a way to ensure that the connection isn't idle if no message is sent that often
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(6))
.inactive_limit(Duration::from_secs(30)),
)
// There are other configurations as well that can be found here:
// <https://docs.rs/reconnecting-jsonrpsee-ws-client/latest/reconnecting_jsonrpsee_ws_client/struct.ClientBuilder.html>
.build("ws://localhost:9944".to_string())
.await?;

let api: OnlineClient<PolkadotConfig> =
OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?;

// Build a balance transfer extrinsic.
let dest = dev::bob().public_key().into();
let balance_transfer_tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000);

// Submit the balance transfer extrinsic from Alice, and wait for it to be successful
// and in a finalized block. We get back the extrinsic events if all is well.
let from = dev::alice();

let mut balance_transfer_progress = api
.tx()
.sign_and_submit_then_watch_default(&balance_transfer_tx, &from)
.await?;

// When the connection is lost, a error RpcError::DisconnectWillReconnect is emitted on the stream.
// in such scenarios the error will be seen here.
//
// In such scenario it's possible that messages are lost when reconnecting
// and if that's acceptable you may just ignore that error message.
while let Some(status) = balance_transfer_progress.next().await {
match status {
// It's finalized in a block!
Ok(TxStatus::InFinalizedBlock(in_block)) => {
// grab the events and fail if no ExtrinsicSuccess event seen:
let events = in_block.wait_for_success().await?;
// We can look for events (this uses the static interface; we can also iterate
// over them and dynamically decode them):
let transfer_event = events.find_first::<polkadot::balances::events::Transfer>()?;

if transfer_event.is_some() {
println!("Balance transfer success");
} else {
println!("Failed to find Balances::Transfer Event");
}
}
// Just log any other status we encounter:
//
// In this example we emit some important status handling for
// here such as Dropped, Invalid etc....
Ok(_) => {
println!("New status");
}
// In this example we just ignore when reconnections occurs
// but it's technically possible that we can lose
// messages on the subscription such as `InFinalizedBlock`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this example overall, but when submitting a tx it's particularly tricky to handle as you will lose the subscription anyway won't you and won't get any new messages after reconnecting?

So perhaps a better (and simpler, anyway) example is just subscribing to finalized blocks?

I can't remember how your reconnecting client handles subscriptions etc; does it automatically re-subscribe behind the scenes? That area is an interesting one because on one hand, it might be better if the reconnecting client just reconnects but doesn't try to re-establish anything (so that the Subxt backend's can choose how best to restart stuff), but on the other hand having the client do it is simpler at least (but I fear may not always do what you want). Ultimately we'd need to agree on a "standard" for what is expected from a reconnecting client when it's used in Subxt

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember how your reconnecting client handles subscriptions etc; does it automatically re-subscribe behind the scenes?

Yes, it re-subscribes to subscriptions and re-transmits pending method calls.

It's just a PITA if one have plenty of storage subscriptions opened and manually have to re-subscribe to them.
For rpc-v2 then it's a different story but not really thought of in this use-case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just make it optional and folks can try it out and complain what's bad :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeahh for now it's behind a feature flag so we can experiment!

My gut feeling is that before it becomes "stable" in Subxt, we'll prob end up wanting the Backend impls to decide when/how to resubscribe etc themselves (eg maybe for TXs we don't try re-sending but for following new/final blocks we just restart it or whatever

Copy link
Member Author

@niklasad1 niklasad1 Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alex had a few cool ideas about distinguish between "stateful (submit_and_watch)" and "stateless (subscribe_storage)" subscriptions but still I think this reconnecting rpc client won't really work that well with the rpc-v2 spec.

One would need to call a few methods for the chainHead_unstable_follow to make emit interesting stuff..

Copy link
Member Author

@niklasad1 niklasad1 Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So perhaps a better (and simpler, anyway) example is just subscribing to finalized blocks?

I want to show how to ignore "DisconnectWillReconnect" and continue if a reconnection occurs. If you just call wait_for_finalized_success it will quit with that error but we could wait_for_finalized_success_and_reconnect... I guess.

Added a few comments in the example now hopefully clearer now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this conversation is good start to open another issue how to deal with it reconnections in subxt properly especially with the rpc v2 spec in mind which is particular tricky.

In general, I think just reconnect is probably the way to go and not re-subscribe and so on but we let's try my crate first and collect some feedback what's the best way forward :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to show how to ignore "DisconnectWillReconnect" and continue if a reconnection occurs. If you just call wait_for_finalized_success it will quit with that error but we could wait_for_finalized_success_and_reconnect... I guess.

What I mean is just that it's less code to subscribe to finalied blocks and then show how you can ignore the DisconnectedWillReconnect error. Probably closer to being a valid thing that we can handle nicely too (we can just start the subscription up again as your client does atm anyway)

With transactions we probably(?) don't want to re-submit automatically so showing it as an example sortof hints at a behaviour that I don't think we'll keep, but anyway it's also just more effort building the tx etc when the point of the example is just handling the DisconnectWillReconnect error :)

In general, I think just reconnect is probably the way to go and not re-subscribe and so on but we let's try my crate first and collect some feedback what's the best way forward :)

Yup I agree; I'm happy with this for now since it's behind a feature flag, but before we stabilise it I thnk we should make the RPC client not try to do anything clever and only reconnect. Instead we can make the backend impls decide how to manage disconnects in followup PRs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is just that it's less code to subscribe to finalied blocks and then show how you can ignore the DisconnectedWillReconnect error. Probably closer to being a valid thing that we can handle nicely too (we can just start the subscription up again as your client does atm anyway)

With transactions we probably(?) don't want to re-submit automatically so showing it as an example sortof hints at a behaviour that I don't think we'll keep, but anyway it's also just more effort building the tx etc when the point of the example is just handling the DisconnectWillReconnect error :)

Got it, makes sense

// when reconnecting.
Err(Error::Rpc(RpcError::DisconnectedWillReconnect(e))) => {
jsdw marked this conversation as resolved.
Show resolved Hide resolved
println!("{:?}", e);
}
Err(err) => {
return Err(err.into());
}
}
}

println!("RPC client reconnected `{}` times", rpc.reconnect_count());

Ok(())
}
7 changes: 5 additions & 2 deletions subxt/src/backend/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ crate::macros::cfg_jsonrpsee! {
mod jsonrpsee_impl;
}

crate::macros::cfg_reconnecting_rpc_client! {
pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client;
}

mod rpc_client;
mod rpc_client_t;

pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT};

pub use rpc_client::{rpc_params, RpcClient, RpcParams, RpcSubscription};
pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT};
91 changes: 91 additions & 0 deletions subxt/src/backend/rpc/rpc_client_t.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,94 @@ impl<T: RpcClientT> RpcClientT for Box<T> {
(**self).subscribe_raw(sub, params, unsub)
}
}

#[cfg(feature = "reconnecting-rpc-client")]
impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably place this under a src/client/reconnecting_client module. Maybe within a ReconnectingClientRpC(reconnecting_jsonrpsee_ws_client::Client).

Ideally we would have a ReconnectingClient<T: Config> that has a ReconnectingClient::builder method and implements OnlineClientT and OfflineClientT traits.
Then we could also expose on the ReconnectingClient a reconnect_count method; and other methods of interest.
This would tidy up a bit the user API. Although the tricky part would be that we'll duplicate the reconnecting_jsonrpsee_ws_client::Client::builder methods on the ReconnectingClient::builder.

Considering this is under a feature-flag at the moment, I'd leave that refactoring as a follow-up as probably would need some extra tought.
What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My gut feeling is that for now it should just be a standalone optional RPC client that you can use to instantiate the existing OnlineClient, but at some point when we've added more special reconnect logic in places etc we can just use it by default in the current OnlineClient instead of the non-reconnecting-client :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the separate module though; something like src/backend/rpc/reconnecting_jsonrpsee_impl.rs maybe to sit next to the current jsonrpsee_impl.rs?

fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
use futures::FutureExt;

async {
self.request_raw(method.to_string(), params)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))
}
.boxed()
}

fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
use futures::{FutureExt, StreamExt, TryStreamExt};
use reconnecting_jsonrpsee_ws_client::{RpcError as ReconnError, SubscriptionId};

// Helper for more human friendly error messages.
// Workaround for: https://github.com/paritytech/jsonrpsee/issues/1280
fn rpc_error(err: ReconnError) -> String {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
match err {
ReconnError::Transport(e) => e.to_string(),
e @ ReconnError::InvalidSubscriptionId => e.to_string(),
ReconnError::InvalidRequestId(e) => e.to_string(),
ReconnError::Custom(e) => e,
ReconnError::RestartNeeded(inner_err) => {
// HACK: jsonrpsee has Error::RestartNeeded(Arc<Error>) and
// we need to visit one child because
// RestartNeeded only contains one layer of recursion.
//
// Thus, `RestartNeeded(RestartNeededArc<Error>)`
// and is unreachable.
match &*inner_err {
ReconnError::Transport(e) => e.to_string(),
e @ ReconnError::InvalidSubscriptionId => e.to_string(),
ReconnError::InvalidRequestId(e) => e.to_string(),
ReconnError::Custom(e) => e.to_string(),
// These variants are infallible.
ReconnError::RestartNeeded(_)
| ReconnError::RequestTimeout
| ReconnError::MaxSlotsExceeded
| ReconnError::Call(_)
| ReconnError::ParseError(_)
| ReconnError::HttpNotImplemented
| ReconnError::EmptyBatchRequest(_)
| ReconnError::RegisterMethod(_) => unreachable!(),
}
}
// These variants are infallible.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
ReconnError::RequestTimeout
| ReconnError::MaxSlotsExceeded
| ReconnError::Call(_)
| ReconnError::ParseError(_)
| ReconnError::HttpNotImplemented
| ReconnError::EmptyBatchRequest(_)
| ReconnError::RegisterMethod(_) => unreachable!(),
}
}

async {
let sub = self
.subscribe_raw(sub.to_string(), params, unsub.to_string())
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;

let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
.map_err(|e| RpcError::DisconnectedWillReconnect(Some(rpc_error(e.0))))
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
.boxed();

Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}
12 changes: 10 additions & 2 deletions subxt/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

mod dispatch_error;

use core::fmt::Debug;

crate::macros::cfg_unstable_light_client! {
pub use crate::client::LightClientError;
}
Expand Down Expand Up @@ -100,6 +98,13 @@ impl From<std::convert::Infallible> for Error {
}
}

impl Error {
/// Checks whether the error was caused by a RPC re-connection.
pub fn is_disconnected_will_reconnect(&self) -> bool {
matches!(self, Error::Rpc(RpcError::DisconnectedWillReconnect(_)))
}
}

/// An RPC error. Since we are generic over the RPC client that is used,
/// the error is boxed and could be casted.
#[derive(Debug, thiserror::Error)]
Expand All @@ -120,6 +125,9 @@ pub enum RpcError {
/// The requested URL is insecure.
#[error("RPC error: insecure URL: {0}")]
InsecureUrl(String),
/// The connection was lost and automatically reconnected.
#[error("RPC error: the connection was lost `{0:?}`; reconnect automatically initiated")]
DisconnectedWillReconnect(Option<String>),
}

impl RpcError {
Expand Down
16 changes: 15 additions & 1 deletion subxt/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,21 @@ macro_rules! cfg_jsonrpsee_web {
}
}

pub(crate) use {cfg_feature, cfg_jsonrpsee, cfg_substrate_compat, cfg_unstable_light_client};
#[allow(unused)]
macro_rules! cfg_reconnecting_rpc_client {
($($item:item)*) => {
$(
#[cfg(all(feature = "reconnecting-rpc-client"))]
#[cfg_attr(docsrs, doc(cfg(feature = "reconnecting-rpc-client")))]
$item
)*
}
}

pub(crate) use {
cfg_feature, cfg_jsonrpsee, cfg_reconnecting_rpc_client, cfg_substrate_compat,
cfg_unstable_light_client,
};

// Only used by light-client.
#[allow(unused)]
Expand Down
Loading