Skip to content

Commit

Permalink
feat: add reconnecting-rpc-client (#1396)
Browse files Browse the repository at this point in the history
* initial commit

* update to reconnecting-ws-client v0.2

* re-export: reconnecting_rpc_client behind feature

* add helper function for reconnect

* fix nit in example

* simplify code without weird error fmt

* address grumbles

* address grumbles

* update reconnecting-ws-client 0.3

* cleanup error message
  • Loading branch information
niklasad1 authored Feb 8, 2024
1 parent 61ab6b9 commit cb67f94
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 16 deletions.
131 changes: 120 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ web = [
"instant/wasm-bindgen"
]

# Enable this to use the reconnecting rpc client
unstable-reconnecting-rpc-client = ["dep:reconnecting-jsonrpsee-ws-client"]

# Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`).
jsonrpsee = ["dep:jsonrpsee"]

Expand Down Expand Up @@ -103,6 +106,9 @@ subxt-lightclient = { workspace = true, optional = true, default-features = fals
# Light client support:
tokio-stream = { workspace = true, optional = true }

# Reconnecting jsonrpc ws client
reconnecting-jsonrpsee-ws-client = { version = "0.3", optional = true }

# For parsing urls to disallow insecure schemes
url = { workspace = true }

Expand Down Expand Up @@ -138,6 +144,11 @@ name = "light_client_parachains"
path = "examples/light_client_parachains.rs"
required-features = ["unstable-light-client", "jsonrpsee", "native"]

[[example]]
name = "reconnecting_rpc_client"
path = "examples/reconnecting_rpc_client.rs"
required-features = ["unstable-reconnecting-rpc-client"]

[package.metadata.docs.rs]
features = ["default", "substrate-compat", "unstable-light-client"]
rustdoc-args = ["--cfg", "docsrs"]
Expand Down
73 changes: 73 additions & 0 deletions subxt/examples/reconnecting_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//! Example to utilize the `reconnecting rpc client` in subxt
//! which hidden behind behind `--feature unstable-reconnecting-rpc-client`
//!
//! To utilize full logs from the RPC client use:
//! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"`

#![allow(missing_docs)]

use std::time::Duration;

use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig};
use subxt::backend::rpc::RpcClient;
use subxt::error::{Error, RpcError};
use subxt::{OnlineClient, PolkadotConfig};

// Generate an interface that we can use from the node's metadata.
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")]
pub mod polkadot {}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

// Create a new client with with a reconnecting RPC client.
let rpc = Client::builder()
// Reconnect with exponential backoff
//
// This API is "iterator-like" so one could limit it to only
// reconnect x times and then quit.
.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
// Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds
// then disconnect.
//
// This is just a way to ensure that the connection isn't idle if no message is sent that often
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(6))
.inactive_limit(Duration::from_secs(30)),
)
// There are other configurations as well that can be found here:
// <https://docs.rs/reconnecting-jsonrpsee-ws-client/latest/reconnecting_jsonrpsee_ws_client/struct.ClientBuilder.html>
.build("ws://localhost:9944".to_string())
.await?;

let api: OnlineClient<PolkadotConfig> =
OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?;

// Subscribe to all finalized blocks:
let mut blocks_sub = api.blocks().subscribe_finalized().await?;

// For each block, print a bunch of information about it:
while let Some(block) = blocks_sub.next().await {
let block = match block {
Ok(b) => b,
Err(Error::Rpc(RpcError::DisconnectedWillReconnect(err))) => {
println!("{err}");
continue;
}
Err(e) => {
return Err(e.into());
}
};

let block_number = block.header().number;
let block_hash = block.hash();

println!("Block #{block_number} ({block_hash})");
}

println!("RPC client reconnected `{}` times", rpc.reconnect_count());

Ok(())
}
8 changes: 6 additions & 2 deletions subxt/src/backend/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ crate::macros::cfg_jsonrpsee! {
mod jsonrpsee_impl;
}

crate::macros::cfg_reconnecting_rpc_client! {
mod reconnecting_jsonrpsee_impl;
pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client;
}

mod rpc_client;
mod rpc_client_t;

pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT};

pub use rpc_client::{rpc_params, RpcClient, RpcParams, RpcSubscription};
pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT};
52 changes: 52 additions & 0 deletions subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use futures::{FutureExt, StreamExt, TryStreamExt};
use reconnecting_jsonrpsee_ws_client::SubscriptionId;
use serde_json::value::RawValue;

impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
async {
self.request_raw(method.to_string(), params)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))
}
.boxed()
}

fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
async {
let sub = self
.subscribe_raw(sub.to_string(), params, unsub.to_string())
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;

let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
.map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string()))
.boxed();

Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}
Loading

0 comments on commit cb67f94

Please sign in to comment.