From 736ae8c7f537a56b01d648cf066f220e47108820 Mon Sep 17 00:00:00 2001 From: Mohanson Date: Mon, 27 May 2019 11:19:07 +0800 Subject: [PATCH] feat: peerCount RPC API (#257) * Implement RPC API peerCount() * chore: rewrite with the new PeerCount trait * chore: revert all old changes in crate network --- Cargo.lock | 1 + components/jsonrpc/Cargo.toml | 1 + components/jsonrpc/src/server.rs | 23 ++++++++++------- components/jsonrpc/src/state.rs | 25 ++++++++++++------ components/jsonrpc/tests/test_cli.py | 2 +- components/jsonrpc/tests/test_client.py | 2 +- src/main.rs | 34 ++++++++++++------------- 7 files changed, 52 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0063b300e..f5d3bdecf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -754,6 +754,7 @@ dependencies = [ "core-context 0.1.0", "core-crypto 0.1.0", "core-merkle 0.1.0", + "core-network 0.2.0", "core-pubsub 0.1.0", "core-runtime 0.1.0", "core-serialization 0.1.0", diff --git a/components/jsonrpc/Cargo.toml b/components/jsonrpc/Cargo.toml index 61bbb557b..9bbe54151 100644 --- a/components/jsonrpc/Cargo.toml +++ b/components/jsonrpc/Cargo.toml @@ -37,6 +37,7 @@ core-pubsub = { path = "../../core/pubsub" } core-storage = { path = "../../core/storage" } core-serialization = { path = "../../core/serialization" } core-crypto = { path = "../../core/crypto" } +core-network = { path = "../../core/network" } [dev-dependencies] common-logger = { path = "../../common/logger" } diff --git a/components/jsonrpc/src/server.rs b/components/jsonrpc/src/server.rs index 06f86a8d3..bf94a0f43 100644 --- a/components/jsonrpc/src/server.rs +++ b/components/jsonrpc/src/server.rs @@ -8,6 +8,7 @@ use serde_json::Value; use core_crypto::Crypto; use core_pubsub::channel::pubsub::Receiver; +use core_runtime::network::PeerCount; use core_runtime::{Executor, Storage, StorageError, TransactionPool}; use core_types::{Address, Block, Hash}; @@ -19,9 +20,9 @@ use crate::filter::Filter; use crate::state::AppState; use crate::util::clean_0x; -fn rpc_handle( +fn rpc_handle( reqjson: web::Json, - app_state: web::Data>, + app_state: web::Data>, _req: HttpRequest, ) -> impl OldFuture where @@ -29,6 +30,7 @@ where T: TransactionPool, S: Storage, C: Crypto, + P: PeerCount, { let fut = async move { match reqjson.into_inner() { @@ -49,15 +51,16 @@ where fut.boxed().compat() } -async fn handle_one_request( +async fn handle_one_request( req: convention::Request, - app_state: web::Data>, + app_state: web::Data>, ) -> convention::Response where E: Executor, T: TransactionPool, S: Storage, C: Crypto, + P: PeerCount, { let mut result = convention::Response::default(); result.id = req.id.clone(); @@ -96,8 +99,8 @@ fn get_string( } #[allow(clippy::cognitive_complexity)] -async fn rpc_select( - app_state: AppState, +async fn rpc_select( + app_state: AppState, method: String, params: Option>, ) -> Result @@ -106,6 +109,7 @@ where T: TransactionPool, S: Storage, C: Crypto, + P: PeerCount, { let params = params.unwrap_or_default(); match method.as_str() { @@ -349,9 +353,9 @@ where } /// Listen and server on address:port which definds on config -pub fn listen( +pub fn listen( config: Config, - app_state: AppState, + app_state: AppState, mut sub_block: Receiver, ) -> std::io::Result<()> where @@ -359,6 +363,7 @@ where T: TransactionPool, S: Storage, C: Crypto, + P: PeerCount, { let mut app_state_clone = app_state.clone(); let fut = async move { @@ -386,7 +391,7 @@ where .data(web::JsonConfig::default().limit(c_payload_size)) // <- limit size of the payload .route( web::post() - .to_async(rpc_handle::), + .to_async(rpc_handle::), ), ) }) diff --git a/components/jsonrpc/src/state.rs b/components/jsonrpc/src/state.rs index 47e15aa36..fe5bee915 100644 --- a/components/jsonrpc/src/state.rs +++ b/components/jsonrpc/src/state.rs @@ -15,6 +15,7 @@ use numext_fixed_uint::U256; use core_context::{Context, ORIGIN}; use core_crypto::{Crypto, CryptoTransform}; use core_merkle::{self, Merkle, ProofNode}; +use core_runtime::network::PeerCount; use core_runtime::{ExecutionContext, Executor, Storage, TransactionOrigin, TransactionPool}; use core_serialization::AsyncCodec; use core_types::{Address, Block, BloomRef, Hash, Receipt, SignedTransaction}; @@ -147,16 +148,17 @@ impl FilterDatabase { } } -pub struct AppState { +pub struct AppState { pub filterdb: Arc>, executor: Arc, transaction_pool: Arc, storage: Arc, crypto: Arc, + peer_count: Arc

, } -impl Clone for AppState { +impl Clone for AppState { fn clone(&self) -> Self { Self { filterdb: Arc::>::clone(&self.filterdb), @@ -165,21 +167,25 @@ impl Clone for AppState { transaction_pool: Arc::::clone(&self.transaction_pool), storage: Arc::::clone(&self.storage), crypto: Arc::::clone(&self.crypto), + peer_count: Arc::

::clone(&self.peer_count), } } } -impl AppState +impl AppState where E: Executor, T: TransactionPool, S: Storage, + C: Crypto, + P: PeerCount, { pub fn new( executor: Arc, transaction_pool: Arc, storage: Arc, crypto: Arc, + peer_count: Arc

, ) -> Self { Self { filterdb: Arc::new(RwLock::new(FilterDatabase::default())), @@ -188,17 +194,19 @@ where transaction_pool, storage, crypto, + peer_count, } } } /// Help functions for rpc APIs. -impl AppState +impl AppState where E: Executor, T: TransactionPool, S: Storage, C: Crypto, + P: PeerCount, { pub async fn get_block(&self, number: String) -> RpcResult { let h = self.get_height(number).await?; @@ -372,12 +380,13 @@ where /// Async rpc APIs. /// See ./server.rs::rpc_select to learn about meanings of these APIs. -impl AppState +impl AppState where E: Executor, T: TransactionPool, S: Storage, C: Crypto, + P: PeerCount, { pub async fn block_number(&self) -> RpcResult { let b = self @@ -719,8 +728,7 @@ where } pub async fn peer_count(&self) -> RpcResult { - // TODO. Can't implement at now - Ok(42) + Ok(self.peer_count.peer_count() as u32) } pub async fn send_raw_transaction(&self, signed_data: Vec) -> RpcResult { @@ -771,12 +779,13 @@ where } /// A set of functions for FilterDataBase. -impl AppState +impl AppState where E: Executor, T: TransactionPool, S: Storage, C: Crypto, + P: PeerCount, { /// Pass a block into FilterDatabase. pub async fn recv_block(&mut self, block: Block) -> RpcResult<()> { diff --git a/components/jsonrpc/tests/test_cli.py b/components/jsonrpc/tests/test_cli.py index 849c4168b..5cf0bfe31 100644 --- a/components/jsonrpc/tests/test_cli.py +++ b/components/jsonrpc/tests/test_cli.py @@ -37,7 +37,7 @@ def call(command): def test_peer_count(): r = call(f'{prefix} peerCount') - assert r == '0x2a' + assert r == '0x1' def test_block_number(): diff --git a/components/jsonrpc/tests/test_client.py b/components/jsonrpc/tests/test_client.py index a154a7fef..5113c70e9 100644 --- a/components/jsonrpc/tests/test_client.py +++ b/components/jsonrpc/tests/test_client.py @@ -16,7 +16,7 @@ def test_peer_count(): r = client.peer_count() - assert r == 42 + assert r == 1 def test_block_number(): diff --git a/src/main.rs b/src/main.rs index 61c64b01a..5ceeeaed2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -105,7 +105,7 @@ fn start(cfg: &Config) { let partial_network = PartialService::new(network_config).unwrap(); let outbound = partial_network.outbound(); - let _peer_count = partial_network.peer_count(); + let peer_count = Arc::new(partial_network.peer_count()); // new tx pool let tx_pool = Arc::new(HashTransactionPool::new( @@ -118,22 +118,6 @@ fn start(cfg: &Config) { block.header.height, )); - // run json rpc - let mut jrpc_config = components_jsonrpc::Config::default(); - jrpc_config.listen = cfg.rpc.address.clone(); - jrpc_config.workers = if cfg.rpc.workers != 0 { - cfg.rpc.workers as usize - } else { - cmp::min(2, num_cpus::get()) - }; - jrpc_config.payload_size = cfg.rpc.payload_size; - let jrpc_state = components_jsonrpc::AppState::new( - Arc::clone(&executor), - Arc::clone(&tx_pool), - Arc::clone(&storage), - Arc::clone(&secp), - ); - // new consensus let privkey = PrivateKey::from_bytes(&hex::decode(cfg.privkey.clone()).unwrap()).unwrap(); @@ -207,6 +191,22 @@ fn start(cfg: &Config) { .subscribe::(PUBSUB_BROADCAST_BLOCK.to_owned()) .unwrap(); + // run json rpc + let mut jrpc_config = components_jsonrpc::Config::default(); + jrpc_config.listen = cfg.rpc.address.clone(); + jrpc_config.workers = if cfg.rpc.workers != 0 { + cfg.rpc.workers as usize + } else { + cmp::min(2, num_cpus::get()) + }; + jrpc_config.payload_size = cfg.rpc.payload_size; + let jrpc_state = components_jsonrpc::AppState::new( + Arc::clone(&executor), + Arc::clone(&tx_pool), + Arc::clone(&storage), + Arc::clone(&secp), + Arc::clone(&peer_count), + ); if let Err(e) = components_jsonrpc::listen(jrpc_config, jrpc_state, sub_block) { log::error!("Failed to start jrpc server: {}", e); };