diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 4faee43718..b44faa476d 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -126,7 +126,7 @@ wasm-bindgen-futures = { workspace = true, optional = true } bitvec = { workspace = true } codec = { workspace = true, features = ["derive", "bit-vec"] } scale-info = { workspace = true, features = ["bit-vec"] } -tokio = { workspace = true, features = ["macros", "time", "rt-multi-thread"] } +tokio = { workspace = true, features = ["macros", "time", "rt-multi-thread", "sync"] } sp-core = { workspace = true } sp-keyring = { workspace = true } sp-runtime = { workspace = true } diff --git a/subxt/src/backend/legacy/mod.rs b/subxt/src/backend/legacy/mod.rs index 44fd606f99..c4145aafc4 100644 --- a/subxt/src/backend/legacy/mod.rs +++ b/subxt/src/backend/legacy/mod.rs @@ -95,28 +95,36 @@ impl Backend for LegacyBackend { keys: Vec>, at: T::Hash, ) -> Result, Error> { - retry(|| async { - let keys = keys.clone(); - let methods = self.methods.clone(); - - // For each key, return it + a future to get the result. - let iter = keys.into_iter().map(move |key| { + fn get_entry( + key: Vec, + at: T::Hash, + methods: LegacyRpcMethods, + ) -> impl Future, Error>> { + retry(move || { let methods = methods.clone(); + let key = key.clone(); async move { let res = methods.state_get_storage(&key, Some(at)).await?; - Ok(res.map(|value| StorageResponse { key, value })) + Ok(res.map(move |value| StorageResponse { key, value })) } - }); + }) + } - let s = stream::iter(iter) - // Resolve the future - .then(|fut| fut) - // Filter any Options out (ie if we didn't find a value at some key we return nothing for it). - .filter_map(|r| future::ready(r.transpose())); + let keys = keys.clone(); + let methods = self.methods.clone(); - Ok(StreamOf(Box::pin(s))) - }) - .await + // For each key, return it + a future to get the result. + let iter = keys + .into_iter() + .map(move |key| get_entry(key, at, methods.clone())); + + let s = stream::iter(iter) + // Resolve the future + .then(|fut| fut) + // Filter any Options out (ie if we didn't find a value at some key we return nothing for it). + .filter_map(|r| future::ready(r.transpose())); + + Ok(StreamOf(Box::pin(s))) } async fn storage_fetch_descendant_keys( diff --git a/subxt/src/backend/legacy/rpc_methods.rs b/subxt/src/backend/legacy/rpc_methods.rs index 8ba1db2f7f..25a9888a1d 100644 --- a/subxt/src/backend/legacy/rpc_methods.rs +++ b/subxt/src/backend/legacy/rpc_methods.rs @@ -455,6 +455,7 @@ pub type EncodedJustification = Vec; /// the RPC call `state_getRuntimeVersion`, #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(serde::Serialize))] pub struct RuntimeVersion { /// Version of the runtime specification. A full-node will not attempt to use its native /// runtime in substitute for the on-chain Wasm runtime unless all of `spec_name`, diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index 6da2faa4ce..b035d34b49 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -325,9 +325,413 @@ pub enum TransactionStatus { /// A response from calls like [`Backend::storage_fetch_values`] or /// [`Backend::storage_fetch_descendant_values`]. +#[cfg_attr(test, derive(serde::Serialize, Clone, PartialEq, Debug))] pub struct StorageResponse { /// The key. pub key: Vec, /// The associated value. pub value: Vec, } + +#[cfg(test)] +mod test { + use super::*; + + mod legacy { + use super::rpc::{RpcClient, RpcClientT}; + use crate::backend::rpc::RawRpcSubscription; + use crate::backend::BackendExt; + use crate::{ + backend::{ + legacy::rpc_methods::Bytes, legacy::rpc_methods::RuntimeVersion, + legacy::LegacyBackend, StorageResponse, + }, + error::RpcError, + }; + use futures::StreamExt; + use serde::Serialize; + use serde_json::value::RawValue; + use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, + }; + use subxt_core::{config::DefaultExtrinsicParams, Config}; + use tokio::sync::{mpsc, Mutex}; + + type RpcResult = Result; + type Item = RpcResult; + + struct MockDataTable { + items: HashMap, VecDeque>, + } + + impl MockDataTable { + fn new() -> Self { + MockDataTable { + items: HashMap::new(), + } + } + + fn from_iter<'a, T: Serialize, I: IntoIterator)>>( + item: I, + ) -> Self { + let mut data = Self::new(); + for (key, item) in item.into_iter() { + data.push(key.into(), item); + } + data + } + + fn push(&mut self, key: Vec, item: RpcResult) { + let item = item.map(|x| serde_json::to_string(&x).unwrap()); + match self.items.entry(key) { + std::collections::hash_map::Entry::Occupied(v) => v.into_mut().push_back(item), + std::collections::hash_map::Entry::Vacant(e) => { + e.insert(VecDeque::from([item])); + } + } + } + + fn pop(&mut self, key: Vec) -> Item { + self.items.get_mut(&key).unwrap().pop_front().unwrap() + } + } + + struct Subscription { + sender: mpsc::Sender>>, + receiver: mpsc::Receiver>>, + } + + impl Subscription { + fn new() -> Self { + let (sender, receiver) = mpsc::channel(32); + Self { sender, receiver } + } + + async fn from_iter< + T: Serialize, + S: IntoIterator>>>, + >( + items: S, + ) -> Self { + let sub = Self::new(); + for i in items { + let i: RpcResult> = i.map(|items| { + items + .into_iter() + .map(|item| item.map(|i| serde_json::to_string(&i).unwrap())) + .collect() + }); + sub.write(i).await + } + sub + } + + async fn read(&mut self) -> RpcResult> { + self.receiver.recv().await.unwrap() + } + + async fn write(&self, items: RpcResult>) { + self.sender.send(items).await.unwrap() + } + } + + struct Data { + request: MockDataTable, + subscription: Subscription, + } + + struct MockRpcClientStorage { + data: Arc>, + } + + impl RpcClientT for MockRpcClientStorage { + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> super::rpc::RawRpcFuture<'a, Box> { + Box::pin(async move { + match method { + "state_getStorage" => { + let mut data = self.data.lock().await; + let params = params.map(|p| p.get().to_string()); + let rpc_params = jsonrpsee::types::Params::new(params.as_deref()); + let key: sp_core::Bytes = rpc_params.sequence().next().unwrap(); + let value = data.request.pop(key.0); + value.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) + } + "chain_getBlockHash" => { + let mut data = self.data.lock().await; + let value = data.request.pop("chain_getBlockHash".into()); + value.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) + } + _ => todo!(), + } + }) + } + + fn subscribe_raw<'a>( + &'a self, + _sub: &'a str, + _params: Option>, + _unsub: &'a str, + ) -> super::rpc::RawRpcFuture<'a, super::rpc::RawRpcSubscription> { + Box::pin(async { + let mut data = self.data.lock().await; + let values: RpcResult>>> = + data.subscription.read().await.map(|v| { + v.into_iter() + .map(|v| { + v.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) + }) + .collect::>>>() + }); + values.map(|v| RawRpcSubscription { + stream: futures::stream::iter(v).boxed(), + id: Some("ID".to_string()), + }) + }) + } + } + + // Define dummy config + enum Conf {} + impl Config for Conf { + type Hash = crate::utils::H256; + type AccountId = crate::utils::AccountId32; + type Address = crate::utils::MultiAddress; + type Signature = crate::utils::MultiSignature; + type Hasher = crate::config::substrate::BlakeTwo256; + type Header = crate::config::substrate::SubstrateHeader; + type ExtrinsicParams = DefaultExtrinsicParams; + + type AssetId = u32; + } + + use crate::backend::Backend; + + fn client_runtime_version(num: u32) -> crate::client::RuntimeVersion { + crate::client::RuntimeVersion { + spec_version: num, + transaction_version: num, + } + } + + fn runtime_version(num: u32) -> RuntimeVersion { + RuntimeVersion { + spec_version: num, + transaction_version: num, + other: HashMap::new(), + } + } + + fn bytes(str: &str) -> RpcResult> { + Ok(Some(Bytes(str.into()))) + } + + fn storage_response>, V: Into>>(key: K, value: V) -> StorageResponse + where + Vec: From, + { + StorageResponse { + key: key.into(), + value: value.into(), + } + } + + async fn build_mock_client< + 'a, + T: Serialize, + D: IntoIterator)>, + S: IntoIterator>>>, + >( + table_data: D, + subscription_data: S, + ) -> RpcClient { + let data = Data { + request: MockDataTable::from_iter(table_data), + subscription: Subscription::from_iter(subscription_data).await, + }; + RpcClient::new(MockRpcClientStorage { + data: Arc::new(Mutex::new(data)), + }) + } + + #[tokio::test] + async fn storage_fetch_values() { + let mock_data = vec![ + ("ID1", bytes("Data1")), + ( + "ID2", + Err(RpcError::DisconnectedWillReconnect( + "Reconnecting".to_string(), + )), + ), + ("ID2", bytes("Data2")), + ( + "ID3", + Err(RpcError::DisconnectedWillReconnect( + "Reconnecting".to_string(), + )), + ), + ("ID3", bytes("Data3")), + ]; + let rpc_client = build_mock_client(mock_data, vec![]).await; + let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); + + // Test + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + crate::utils::H256::random(), + ) + .await + .unwrap(); + + let response = response + .map(|x| x.unwrap()) + .collect::>() + .await; + + let expected = vec![ + storage_response("ID1", "Data1"), + storage_response("ID2", "Data2"), + storage_response("ID3", "Data3"), + ]; + + assert_eq!(expected, response) + } + + #[tokio::test] + async fn storage_fetch_value() { + // Setup + let mock_data = [ + ( + "ID1", + Err(RpcError::DisconnectedWillReconnect( + "Reconnecting".to_string(), + )), + ), + ("ID1", bytes("Data1")), + ]; + let rpc_client = build_mock_client(mock_data, vec![]).await; + + // Test + let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); + let response = backend + .storage_fetch_value("ID1".into(), crate::utils::H256::random()) + .await + .unwrap(); + + let response = response.unwrap(); + assert_eq!("Data1".to_owned(), String::from_utf8(response).unwrap()) + } + + #[tokio::test] + /// This test should cover the logic of the following methods: + /// - `genesis_hash` + /// - `block_header` + /// - `block_body` + /// - `latest_finalized_block` + /// - `current_runtime_version` + /// - `current_runtime_version` + /// - `call` + /// The test covers them because they follow the simple pattern of: + /// ```no_run + /// async fn THE_THING(&self) -> Result { + /// retry(|| ).await + /// } + /// ``` + async fn simple_fetch() { + let hash = crate::utils::H256::random(); + let mock_data = vec![ + ( + "chain_getBlockHash", + Err(RpcError::DisconnectedWillReconnect( + "Reconnecting".to_string(), + )), + ), + ("chain_getBlockHash", Ok(Some(hash))), + ]; + let rpc_client = build_mock_client(mock_data, vec![]).await; + + // Test + let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); + let response = backend.genesis_hash().await.unwrap(); + + assert_eq!(hash, response) + } + + #[tokio::test] + /// This test should cover the logic of the following methods: + /// - `stream_runtime_version` + /// - `stream_all_block_headers` + /// - `stream_best_block_headers` + /// The test covers them because they follow the simple pattern of: + /// ```no_run + /// async fn stream_the_thing( + /// &self, + /// ) -> Result)>, Error> { + /// let methods = self.methods.clone(); + /// let retry_sub = retry_stream(move || { + /// let methods = methods.clone(); + /// Box::pin(async move { + /// methods.do_the_thing().await? + /// }); + /// Ok(StreamOf(Box::pin(sub))) + /// }) + /// }) + /// .await?; + /// Ok(retry_sub) + /// } + /// ``` + async fn stream_simple() { + let mock_subscription_data = vec![ + Ok(vec![ + Ok(runtime_version(0)), + Err(RpcError::DisconnectedWillReconnect( + "Reconnecting".to_string(), + )), + Ok(runtime_version(1)), + ]), + Ok(vec![ + Err(RpcError::DisconnectedWillReconnect( + "Reconnecting".to_string(), + )), + Ok(runtime_version(2)), + Ok(runtime_version(3)), + ]), + Ok(vec![ + Ok(runtime_version(4)), + Ok(runtime_version(5)), + Err(RpcError::RequestRejected("Reconnecting".to_string())), + ]), + ]; + let rpc_client = build_mock_client(vec![], mock_subscription_data).await; + + // Test + let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); + + let mut results = backend.stream_runtime_version().await.unwrap(); + let mut expected = VecDeque::from(vec![ + Ok::(client_runtime_version(0)), + Ok(client_runtime_version(4)), + Ok(client_runtime_version(5)), + ]); + + while let Some(res) = results.next().await { + if res.is_ok() { + assert_eq!(expected.pop_front().unwrap().unwrap(), res.unwrap()) + } else { + assert!(matches!( + res, + Err(crate::Error::Rpc(RpcError::RequestRejected(_))) + )) + } + } + assert!(expected.is_empty()); + assert!(results.next().await.is_none()) + } + } +}