Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
jsonrpsee: add rpc handlers back (#10245)
Browse files Browse the repository at this point in the history
* add back RpcHandlers

* cargo fmt

* fix docs
  • Loading branch information
niklasad1 authored Nov 12, 2021
1 parent 32e16e7 commit 4894b27
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 35 deletions.
12 changes: 9 additions & 3 deletions bin/node/test-runner-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,20 @@ mod tests {
#[test]
fn test_runner() {
let tokio_runtime = build_runtime().unwrap();
let (task_manager, client, pool, command_sink, backend) =
let (rpc, task_manager, client, pool, command_sink, backend) =
client_parts::<NodeTemplateChainInfo>(ConfigOrChainSpec::ChainSpec(
Box::new(development_config()),
tokio_runtime.handle().clone(),
))
.unwrap();
let node =
Node::<NodeTemplateChainInfo>::new(task_manager, client, pool, command_sink, backend);
let node = Node::<NodeTemplateChainInfo>::new(
rpc,
task_manager,
client,
pool,
command_sink,
backend,
);

tokio_runtime.block_on(async {
// seals blocks
Expand Down
14 changes: 12 additions & 2 deletions client/rpc/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,21 @@ impl Spawn for TaskExecutor {
}
}
impl SpawnNamed for TaskExecutor {
fn spawn_blocking(&self, _name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn_blocking(
&self,
_name: &'static str,
_group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
EXECUTOR.spawn_ok(future);
}

fn spawn(&self, _name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn(
&self,
_name: &'static str,
_group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
EXECUTOR.spawn_ok(future);
}
}
Expand Down
23 changes: 11 additions & 12 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
config::{Configuration, KeystoreConfig, PrometheusConfig, TransactionStorageMode},
error::Error,
metrics::MetricsService,
start_rpc_servers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
start_rpc_servers, RpcHandlers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
};
use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
use jsonrpsee::RpcModule;
Expand Down Expand Up @@ -323,7 +323,7 @@ where
}

/// Parameters to pass into `build`.
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, Backend> {
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
/// The service configuration.
pub config: Configuration,
/// A shared client returned by `new_full_parts`/`new_light_parts`.
Expand All @@ -340,7 +340,7 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, Backend> {
pub transaction_pool: Arc<TExPool>,
/// Builds additional [`RpcModule`]s that should be added to the server
pub rpc_builder:
Box<dyn FnOnce(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<()>, Error>>,
Box<dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
/// An optional, shared remote blockchain instance. Used for light clients.
pub remote_blockchain: Option<Arc<dyn RemoteBlockchain<TBl>>>,
/// A shared network instance.
Expand Down Expand Up @@ -384,9 +384,9 @@ where
}

/// Spawn the tasks that are required to run a node.
pub fn spawn_tasks<TBl, TBackend, TExPool, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TBackend>,
) -> Result<(), Error>
pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
) -> Result<RpcHandlers, Error>
where
TCl: ProvideRuntimeApi<TBl>
+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
Expand Down Expand Up @@ -494,11 +494,12 @@ where
system_rpc_tx.clone(),
&config,
backend.offchain_storage(),
rpc_builder,
&*rpc_builder,
)
};

let rpc = start_rpc_servers(&config, gen_rpc_module)?;
let rpc_handlers = RpcHandlers(Arc::new(gen_rpc_module(sc_rpc::DenyUnsafe::No)?.into()));

// Spawn informant task
spawn_handle.spawn(
Expand All @@ -514,7 +515,7 @@ where

task_manager.keep_alive((config.base_path, rpc));

Ok(())
Ok(rpc_handlers)
}

async fn transaction_notifications<TBl, TExPool>(
Expand Down Expand Up @@ -571,7 +572,7 @@ fn init_telemetry<TBl: BlockT, TCl: BlockBackend<TBl>>(
Ok(telemetry.handle())
}

fn gen_rpc_module<TBl, TBackend, TCl, TExPool>(
fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
deny_unsafe: DenyUnsafe,
spawn_handle: SpawnTaskHandle,
client: Arc<TCl>,
Expand All @@ -580,9 +581,7 @@ fn gen_rpc_module<TBl, TBackend, TCl, TExPool>(
system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
config: &Configuration,
offchain_storage: Option<<TBackend as sc_client_api::backend::Backend<TBl>>::OffchainStorage>,
rpc_builder: Box<
dyn FnOnce(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<()>, Error>,
>,
rpc_builder: &(dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
) -> Result<RpcModule<()>, Error>
where
TBl: BlockT,
Expand Down
41 changes: 31 additions & 10 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use log::{debug, error, warn};
use sc_client_api::{blockchain::HeaderBackend, BlockchainEvents};
use sc_network::PeerId;
use sc_utils::mpsc::TracingUnboundedReceiver;
use serde::Serialize;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT},
Expand Down Expand Up @@ -79,9 +80,28 @@ pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME};

const DEFAULT_PROTOCOL_ID: &str = "sup";

/// Dummy RPC handler type.
// TODO(niklasad1): replace this to do perform in-memory rpc request.
pub type RpcHandlers = ();
/// RPC handlers that can perform RPC queries.
#[derive(Clone)]
pub struct RpcHandlers(Arc<RpcModule<()>>);

impl RpcHandlers {
/// Starts an RPC query.
///
/// The query is passed as a string and must be a JSON text similar to what an HTTP client
/// would for example send.
///
/// Returns a `Future` that contains the optional response.
//
// TODO(niklasad1): support subscriptions?!.
pub async fn rpc_query<T: Serialize>(&self, method: &str, params: Vec<T>) -> Option<String> {
self.0.call_with(method, params).await
}

/// Provides access to the underlying `RpcModule`
pub fn handle(&self) -> Arc<RpcModule<()>> {
self.0.clone()
}
}

/// An incomplete set of chain components, but enough to run the chain ops subcommands.
pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, TransactionPool, Other> {
Expand Down Expand Up @@ -380,16 +400,16 @@ where
fn import(&self, transaction: B::Extrinsic) -> TransactionImportFuture {
if !self.imports_external_transactions {
debug!("Transaction rejected");
return Box::pin(futures::future::ready(TransactionImport::None))
return Box::pin(futures::future::ready(TransactionImport::None));
}

let encoded = transaction.encode();
let uxt = match Decode::decode(&mut &encoded[..]) {
Ok(uxt) => uxt,
Err(e) => {
debug!("Transaction invalid: {:?}", e);
return Box::pin(futures::future::ready(TransactionImport::Bad))
},
return Box::pin(futures::future::ready(TransactionImport::Bad));
}
};

let best_block_id = BlockId::hash(self.client.info().best_hash);
Expand All @@ -403,18 +423,19 @@ where
match import_future.await {
Ok(_) => TransactionImport::NewGood,
Err(e) => match e.into_pool_error() {
Ok(sc_transaction_pool_api::error::Error::AlreadyImported(_)) =>
TransactionImport::KnownGood,
Ok(sc_transaction_pool_api::error::Error::AlreadyImported(_)) => {
TransactionImport::KnownGood
}
Ok(e) => {
debug!("Error adding transaction to the pool: {:?}", e);
TransactionImport::Bad
},
}
Err(e) => {
debug!("Error converting pool error: {:?}", e);
// it is not bad at least, just some internal node logic error, so peer is
// innocent.
TransactionImport::KnownGood
},
}
},
}
})
Expand Down
2 changes: 0 additions & 2 deletions frame/bags-list/remote-tests/src/sanity_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ use frame_support::{
};
use remote_externalities::{Builder, Mode, OnlineConfig};
use sp_runtime::{traits::Block as BlockT, DeserializeOwned};
use sp_std::prelude::*;

/// Execute the sanity check of the bags-list.
pub async fn execute<Runtime: crate::RuntimeT, Block: BlockT + DeserializeOwned>(
currency_unit: u64,
Expand Down
12 changes: 6 additions & 6 deletions test-utils/test-runner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
use std::{str::FromStr, sync::Arc};

type ClientParts<T> = (
Arc<RpcModule<()>>,
TaskManager,
Arc<
TFullClient<
Expand Down Expand Up @@ -187,13 +188,11 @@ where
let rpc_sink = command_sink.clone();

let rpc_builder = Box::new(move |_, _| {
let seal = ManualSeal::new(rpc_sink).into_rpc();
let mut module = RpcModule::new(());
module.merge(seal).expect("only one module; qed");
Ok(module)
let seal = ManualSeal::new(rpc_sink.clone()).into_rpc();
Ok(seal)
});

let _rpc_handlers = {
let rpc_handlers = {
let params = SpawnTasksParams {
config,
client: client.clone(),
Expand Down Expand Up @@ -241,6 +240,7 @@ where
.spawn("manual-seal", None, authorship_future);

network_starter.start_network();
let rpc_handler = rpc_handlers.handle();

Ok((task_manager, client, transaction_pool, command_sink, backend))
Ok((rpc_handler, task_manager, client, transaction_pool, command_sink, backend))
}
15 changes: 15 additions & 0 deletions test-utils/test-runner/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use futures::{
channel::{mpsc, oneshot},
FutureExt, SinkExt,
};
use jsonrpsee::RpcModule;
use manual_seal::EngineCommand;
use sc_client_api::{
backend::{self, Backend},
Expand All @@ -46,6 +47,8 @@ use sp_state_machine::Ext;
/// the node process is dropped when this struct is dropped
/// also holds logs from the process.
pub struct Node<T: ChainInfo> {
/// rpc handler for communicating with the node over rpc.
rpc_handler: Arc<RpcModule<()>>,
/// handle to the running node.
task_manager: Option<TaskManager>,
/// client instance
Expand Down Expand Up @@ -82,6 +85,7 @@ where
{
/// Creates a new node.
pub fn new(
rpc_handler: Arc<RpcModule<()>>,
task_manager: TaskManager,
client: Arc<
TFullClient<T::Block, T::RuntimeApi, NativeElseWasmExecutor<T::ExecutorDispatch>>,
Expand All @@ -101,6 +105,7 @@ where
backend: Arc<TFullBackend<T::Block>>,
) -> Self {
Self {
rpc_handler,
task_manager: Some(task_manager),
client: client.clone(),
pool,
Expand All @@ -110,6 +115,16 @@ where
}
}

/// Returns a reference to the rpc handlers, use this to send rpc requests.
/// eg
/// ```ignore
/// let response = node.rpc_handler()
/// .call_with(""engine_createBlock", vec![true, true]);
/// ```
pub fn rpc_handler(&self) -> Arc<RpcModule<()>> {
self.rpc_handler.clone()
}

/// Return a reference to the Client
pub fn client(
&self,
Expand Down

0 comments on commit 4894b27

Please sign in to comment.