From 838bca4079986b7d1bac91f8a0682c4d91e77e11 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 2 Aug 2022 13:40:41 +0300 Subject: [PATCH] Use jsonrpsee subscriptions (#1533) * splitted Substrate RPC trait * introduce subscription methods * removed commented code * removed commented code --- bridges/relays/client-substrate/src/client.rs | 83 ++++++------- bridges/relays/client-substrate/src/rpc.rs | 116 +++++++++++++----- 2 files changed, 119 insertions(+), 80 deletions(-) diff --git a/bridges/relays/client-substrate/src/client.rs b/bridges/relays/client-substrate/src/client.rs index ee2b0048fed96..f1ddeeeb0f039 100644 --- a/bridges/relays/client-substrate/src/client.rs +++ b/bridges/relays/client-substrate/src/client.rs @@ -18,7 +18,11 @@ use crate::{ chain::{Chain, ChainWithBalances, TransactionStatusOf}, - rpc::SubstrateClient, + rpc::{ + SubstrateAuthorClient, SubstrateChainClient, SubstrateFrameSystemClient, + SubstrateGrandpaClient, SubstrateStateClient, SubstrateSystemClient, + SubstrateTransactionPaymentClient, + }, ConnectionParams, Error, HashOf, HeaderIdOf, Result, }; @@ -29,8 +33,7 @@ use codec::{Decode, Encode}; use frame_system::AccountInfo; use futures::{SinkExt, StreamExt}; use jsonrpsee::{ - core::{client::SubscriptionClientT, DeserializeOwned}, - types::params::ParamsSer, + core::DeserializeOwned, ws_client::{WsClient as RpcClient, WsClientBuilder as RpcClientBuilder}, }; use num_traits::{Bounded, Zero}; @@ -153,8 +156,7 @@ impl Client { let genesis_hash_client = client.clone(); let genesis_hash = tokio .spawn(async move { - SubstrateClient::::chain_get_block_hash(&*genesis_hash_client, Some(number)) - .await + SubstrateChainClient::::block_hash(&*genesis_hash_client, Some(number)).await }) .await??; @@ -212,7 +214,7 @@ impl Client { /// Returns true if client is connected to at least one peer and is in synced state. pub async fn ensure_synced(&self) -> Result<()> { self.jsonrpsee_execute(|client| async move { - let health = SubstrateClient::::system_health(&*client).await?; + let health = SubstrateSystemClient::::health(&*client).await?; let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0); if is_synced { Ok(()) @@ -231,7 +233,7 @@ impl Client { /// Return hash of the best finalized block. pub async fn best_finalized_header_hash(&self) -> Result { self.jsonrpsee_execute(|client| async move { - Ok(SubstrateClient::::chain_get_finalized_head(&*client).await?) + Ok(SubstrateChainClient::::finalized_head(&*client).await?) }) .await } @@ -252,7 +254,7 @@ impl Client { C::Header: DeserializeOwned, { self.jsonrpsee_execute(|client| async move { - Ok(SubstrateClient::::chain_get_header(&*client, None).await?) + Ok(SubstrateChainClient::::header(&*client, None).await?) }) .await } @@ -260,7 +262,7 @@ impl Client { /// Get a Substrate block from its hash. pub async fn get_block(&self, block_hash: Option) -> Result { self.jsonrpsee_execute(move |client| async move { - Ok(SubstrateClient::::chain_get_block(&*client, block_hash).await?) + Ok(SubstrateChainClient::::block(&*client, block_hash).await?) }) .await } @@ -271,7 +273,7 @@ impl Client { C::Header: DeserializeOwned, { self.jsonrpsee_execute(move |client| async move { - Ok(SubstrateClient::::chain_get_header(&*client, Some(block_hash)).await?) + Ok(SubstrateChainClient::::header(&*client, Some(block_hash)).await?) }) .await } @@ -279,7 +281,7 @@ impl Client { /// Get a Substrate block hash by its number. pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result { self.jsonrpsee_execute(move |client| async move { - Ok(SubstrateClient::::chain_get_block_hash(&*client, Some(number)).await?) + Ok(SubstrateChainClient::::block_hash(&*client, Some(number)).await?) }) .await } @@ -297,7 +299,7 @@ impl Client { /// Return runtime version. pub async fn runtime_version(&self) -> Result { self.jsonrpsee_execute(move |client| async move { - Ok(SubstrateClient::::state_runtime_version(&*client).await?) + Ok(SubstrateStateClient::::runtime_version(&*client).await?) }) .await } @@ -341,7 +343,7 @@ impl Client { block_hash: Option, ) -> Result> { self.jsonrpsee_execute(move |client| async move { - Ok(SubstrateClient::::state_get_storage(&*client, storage_key, block_hash).await?) + Ok(SubstrateStateClient::::storage(&*client, storage_key, block_hash).await?) }) .await } @@ -354,7 +356,7 @@ impl Client { self.jsonrpsee_execute(move |client| async move { let storage_key = C::account_info_storage_key(&account); let encoded_account_data = - SubstrateClient::::state_get_storage(&*client, storage_key, None) + SubstrateStateClient::::storage(&*client, storage_key, None) .await? .ok_or(Error::AccountDoesNotExist)?; let decoded_account_data = AccountInfo::>::decode( @@ -371,7 +373,7 @@ impl Client { /// Note: It's the caller's responsibility to make sure `account` is a valid SS58 address. pub async fn next_account_index(&self, account: C::AccountId) -> Result { self.jsonrpsee_execute(move |client| async move { - Ok(SubstrateClient::::system_account_next_index(&*client, account).await?) + Ok(SubstrateFrameSystemClient::::account_next_index(&*client, account).await?) }) .await } @@ -381,7 +383,7 @@ impl Client { /// Note: The given transaction needs to be SCALE encoded beforehand. pub async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result { self.jsonrpsee_execute(move |client| async move { - let tx_hash = SubstrateClient::::author_submit_extrinsic(&*client, transaction) + let tx_hash = SubstrateAuthorClient::::submit_extrinsic(&*client, transaction) .await .map_err(|e| { log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e); @@ -418,12 +420,12 @@ impl Client { self.jsonrpsee_execute(move |client| async move { let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?; - let tx_hash = SubstrateClient::::author_submit_extrinsic(&*client, extrinsic) + let tx_hash = SubstrateAuthorClient::::submit_extrinsic(&*client, extrinsic) .await .map_err(|e| { - log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e); - e - })?; + log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e); + e + })?; log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); Ok(tx_hash) }) @@ -445,18 +447,13 @@ impl Client { .jsonrpsee_execute(move |client| async move { let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?; let tx_hash = C::Hasher::hash(&extrinsic.0); - let subscription = client - .subscribe( - "author_submitAndWatchExtrinsic", - Some(ParamsSer::Array(vec![jsonrpsee::core::to_json_value(extrinsic) - .map_err(|e| Error::RpcError(e.into()))?])), - "author_unwatchExtrinsic", - ) - .await - .map_err(|e| { - log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e); - e - })?; + let subscription = + SubstrateAuthorClient::::submit_and_watch_extrinsic(&*client, extrinsic) + .await + .map_err(|e| { + log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e); + e + })?; log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); Ok(subscription) }) @@ -474,7 +471,7 @@ impl Client { /// Returns pending extrinsics from transaction pool. pub async fn pending_extrinsics(&self) -> Result> { self.jsonrpsee_execute(move |client| async move { - Ok(SubstrateClient::::author_pending_extrinsics(&*client).await?) + Ok(SubstrateAuthorClient::::pending_extrinsics(&*client).await?) }) .await } @@ -490,7 +487,7 @@ impl Client { let data = Bytes((TransactionSource::External, transaction, at_block).encode()); let encoded_response = - SubstrateClient::::state_call(&*client, call, data, Some(at_block)).await?; + SubstrateStateClient::::call(&*client, call, data, Some(at_block)).await?; let validity = TransactionValidity::decode(&mut &encoded_response.0[..]) .map_err(Error::ResponseParseFailed)?; @@ -506,7 +503,7 @@ impl Client { ) -> Result> { self.jsonrpsee_execute(move |client| async move { let fee_details = - SubstrateClient::::payment_query_fee_details(&*client, transaction, None) + SubstrateTransactionPaymentClient::::fee_details(&*client, transaction, None) .await?; let inclusion_fee = fee_details .inclusion_fee @@ -540,7 +537,7 @@ impl Client { let data = Bytes(Vec::new()); let encoded_response = - SubstrateClient::::state_call(&*client, call, data, Some(block)).await?; + SubstrateStateClient::::call(&*client, call, data, Some(block)).await?; let authority_list = encoded_response.0; Ok(authority_list) @@ -556,7 +553,7 @@ impl Client { at_block: Option, ) -> Result { self.jsonrpsee_execute(move |client| async move { - SubstrateClient::::state_call(&*client, method, data, at_block) + SubstrateStateClient::::call(&*client, method, data, at_block) .await .map_err(Into::into) }) @@ -570,7 +567,7 @@ impl Client { at_block: C::Hash, ) -> Result { self.jsonrpsee_execute(move |client| async move { - SubstrateClient::::state_prove_storage(&*client, keys, Some(at_block)) + SubstrateStateClient::::prove_storage(&*client, keys, Some(at_block)) .await .map(|proof| { StorageProof::new(proof.proof.into_iter().map(|b| b.0).collect::>()) @@ -583,7 +580,7 @@ impl Client { /// Return `tokenDecimals` property from the set of chain properties. pub async fn token_decimals(&self) -> Result> { self.jsonrpsee_execute(move |client| async move { - let system_properties = SubstrateClient::::system_properties(&*client).await?; + let system_properties = SubstrateSystemClient::::properties(&*client).await?; Ok(system_properties.get("tokenDecimals").and_then(|v| v.as_u64())) }) .await @@ -593,13 +590,7 @@ impl Client { pub async fn subscribe_grandpa_justifications(&self) -> Result> { let subscription = self .jsonrpsee_execute(move |client| async move { - Ok(client - .subscribe( - "grandpa_subscribeJustifications", - None, - "grandpa_unsubscribeJustifications", - ) - .await?) + Ok(SubstrateGrandpaClient::::subscribe_justifications(&*client).await?) }) .await?; let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); diff --git a/bridges/relays/client-substrate/src/rpc.rs b/bridges/relays/client-substrate/src/rpc.rs index 37c84b7d28f1f..fdba424dd90fd 100644 --- a/bridges/relays/client-substrate/src/rpc.rs +++ b/bridges/relays/client-substrate/src/rpc.rs @@ -16,7 +16,7 @@ //! The most generic Substrate node RPC interface. -use crate::Chain; +use crate::{Chain, TransactionStatusOf}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use pallet_transaction_payment_rpc_runtime_api::FeeDetails; @@ -28,52 +28,100 @@ use sp_core::{ use sp_rpc::number::NumberOrHex; use sp_version::RuntimeVersion; -#[rpc(client, client_bounds(C: Chain))] -pub(crate) trait Substrate { - #[method(name = "system_health", param_kind = array)] - async fn system_health(&self) -> RpcResult; - #[method(name = "system_properties", param_kind = array)] - async fn system_properties(&self) -> RpcResult; - #[method(name = "chain_getHeader", param_kind = array)] - async fn chain_get_header(&self, block_hash: Option) -> RpcResult; - #[method(name = "chain_getFinalizedHead", param_kind = array)] - async fn chain_get_finalized_head(&self) -> RpcResult; - #[method(name = "chain_getBlock", param_kind = array)] - async fn chain_get_block(&self, block_hash: Option) -> RpcResult; - #[method(name = "chain_getBlockHash", param_kind = array)] - async fn chain_get_block_hash( - &self, - block_number: Option, - ) -> RpcResult; - #[method(name = "system_accountNextIndex", param_kind = array)] - async fn system_account_next_index(&self, account_id: C::AccountId) -> RpcResult; - #[method(name = "author_submitExtrinsic", param_kind = array)] - async fn author_submit_extrinsic(&self, extrinsic: Bytes) -> RpcResult; - #[method(name = "author_pendingExtrinsics", param_kind = array)] - async fn author_pending_extrinsics(&self) -> RpcResult>; - #[method(name = "state_call", param_kind = array)] - async fn state_call( +/// RPC methods of Substrate `system` namespace, that we are using. +#[rpc(client, client_bounds(C: Chain), namespace = "system")] +pub(crate) trait SubstrateSystem { + /// Return node health. + #[method(name = "health")] + async fn health(&self) -> RpcResult; + /// Return system properties. + #[method(name = "properties")] + async fn properties(&self) -> RpcResult; +} + +/// RPC methods of Substrate `chain` namespace, that we are using. +#[rpc(client, client_bounds(C: Chain), namespace = "chain")] +pub(crate) trait SubstrateChain { + /// Get block hash by its number. + #[method(name = "getBlockHash")] + async fn block_hash(&self, block_number: Option) -> RpcResult; + /// Return block header by its hash. + #[method(name = "getHeader")] + async fn header(&self, block_hash: Option) -> RpcResult; + /// Return best finalized block hash. + #[method(name = "getFinalizedHead")] + async fn finalized_head(&self) -> RpcResult; + /// Return signed block (with justifications) by its hash. + #[method(name = "getBlock")] + async fn block(&self, block_hash: Option) -> RpcResult; +} + +/// RPC methods of Substrate `author` namespace, that we are using. +#[rpc(client, client_bounds(C: Chain), namespace = "author")] +pub(crate) trait SubstrateAuthor { + /// Submit extrinsic to the transaction pool. + #[method(name = "submitExtrinsic")] + async fn submit_extrinsic(&self, extrinsic: Bytes) -> RpcResult; + /// Return vector of pending extrinsics from the transaction pool. + #[method(name = "pendingExtrinsics")] + async fn pending_extrinsics(&self) -> RpcResult>; + /// Submit and watch for extrinsic state. + #[subscription(name = "submitAndWatchExtrinsic", unsubscribe = "unwatchExtrinsic", item = TransactionStatusOf)] + fn submit_and_watch_extrinsic(&self, extrinsic: Bytes); +} + +/// RPC methods of Substrate `state` namespace, that we are using. +#[rpc(client, client_bounds(C: Chain), namespace = "state")] +pub(crate) trait SubstrateState { + /// Get current runtime version. + #[method(name = "getRuntimeVersion")] + async fn runtime_version(&self) -> RpcResult; + /// Call given runtime method. + #[method(name = "call")] + async fn call( &self, method: String, data: Bytes, at_block: Option, ) -> RpcResult; - #[method(name = "state_getStorage", param_kind = array)] - async fn state_get_storage( + /// Get value of the runtime storage. + #[method(name = "getStorage")] + async fn storage( &self, key: StorageKey, at_block: Option, ) -> RpcResult>; - #[method(name = "state_getReadProof", param_kind = array)] - async fn state_prove_storage( + /// Get proof of the runtime storage value. + #[method(name = "getReadProof")] + async fn prove_storage( &self, keys: Vec, hash: Option, ) -> RpcResult>; - #[method(name = "state_getRuntimeVersion", param_kind = array)] - async fn state_runtime_version(&self) -> RpcResult; - #[method(name = "payment_queryFeeDetails", param_kind = array)] - async fn payment_query_fee_details( +} + +/// RPC methods of Substrate `grandpa` namespace, that we are using. +#[rpc(client, client_bounds(C: Chain), namespace = "grandpa")] +pub(crate) trait SubstrateGrandpa { + /// Subscribe to GRANDPA justifications. + #[subscription(name = "subscribeJustifications", unsubscribe = "unsubscribeJustifications", item = Bytes)] + fn subscribe_justifications(&self); +} + +/// RPC methods of Substrate `system` frame pallet, that we are using. +#[rpc(client, client_bounds(C: Chain), namespace = "system")] +pub(crate) trait SubstrateFrameSystem { + /// Return index of next account transaction. + #[method(name = "accountNextIndex")] + async fn account_next_index(&self, account_id: C::AccountId) -> RpcResult; +} + +/// RPC methods of Substrate `pallet_transaction_payment` frame pallet, that we are using. +#[rpc(client, client_bounds(C: Chain), namespace = "payment")] +pub(crate) trait SubstrateTransactionPayment { + /// Query transaction fee details. + #[method(name = "queryFeeDetails")] + async fn fee_details( &self, extrinsic: Bytes, at_block: Option,