Skip to content

Commit

Permalink
feat: peerCount RPC API (nervosnetwork#257)
Browse files Browse the repository at this point in the history
* Implement RPC API peerCount()

* chore: rewrite with the new PeerCount trait

* chore: revert all old changes in crate network
  • Loading branch information
mohanson authored May 27, 2019
1 parent 4544a42 commit 736ae8c
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 36 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/jsonrpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ core-pubsub = { path = "../../core/pubsub" }
core-storage = { path = "../../core/storage" }
core-serialization = { path = "../../core/serialization" }
core-crypto = { path = "../../core/crypto" }
core-network = { path = "../../core/network" }

[dev-dependencies]
common-logger = { path = "../../common/logger" }
23 changes: 14 additions & 9 deletions components/jsonrpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use serde_json::Value;

use core_crypto::Crypto;
use core_pubsub::channel::pubsub::Receiver;
use core_runtime::network::PeerCount;
use core_runtime::{Executor, Storage, StorageError, TransactionPool};
use core_types::{Address, Block, Hash};

Expand All @@ -19,16 +20,17 @@ use crate::filter::Filter;
use crate::state::AppState;
use crate::util::clean_0x;

fn rpc_handle<E: 'static, T: 'static, S: 'static, C: 'static>(
fn rpc_handle<E: 'static, T: 'static, S: 'static, C: 'static, P: 'static>(
reqjson: web::Json<convention::Call>,
app_state: web::Data<AppState<E, T, S, C>>,
app_state: web::Data<AppState<E, T, S, C, P>>,
_req: HttpRequest,
) -> impl OldFuture<Item = HttpResponse, Error = actix_web::Error>
where
E: Executor,
T: TransactionPool,
S: Storage,
C: Crypto,
P: PeerCount,
{
let fut = async move {
match reqjson.into_inner() {
Expand All @@ -49,15 +51,16 @@ where
fut.boxed().compat()
}

async fn handle_one_request<E: 'static, T: 'static, S: 'static, C: 'static>(
async fn handle_one_request<E: 'static, T: 'static, S: 'static, C: 'static, P: 'static>(
req: convention::Request,
app_state: web::Data<AppState<E, T, S, C>>,
app_state: web::Data<AppState<E, T, S, C, P>>,
) -> convention::Response
where
E: Executor,
T: TransactionPool,
S: Storage,
C: Crypto,
P: PeerCount,
{
let mut result = convention::Response::default();
result.id = req.id.clone();
Expand Down Expand Up @@ -96,8 +99,8 @@ fn get_string(
}

#[allow(clippy::cognitive_complexity)]
async fn rpc_select<E: 'static, T: 'static, S: 'static, C: 'static>(
app_state: AppState<E, T, S, C>,
async fn rpc_select<E: 'static, T: 'static, S: 'static, C: 'static, P: 'static>(
app_state: AppState<E, T, S, C, P>,
method: String,
params: Option<Vec<Value>>,
) -> Result<Value, convention::ErrorData>
Expand All @@ -106,6 +109,7 @@ where
T: TransactionPool,
S: Storage,
C: Crypto,
P: PeerCount,
{
let params = params.unwrap_or_default();
match method.as_str() {
Expand Down Expand Up @@ -349,16 +353,17 @@ where
}

/// Listen and server on address:port which definds on config
pub fn listen<E: 'static, T: 'static, S: 'static, C: 'static>(
pub fn listen<E: 'static, T: 'static, S: 'static, C: 'static, P: 'static>(
config: Config,
app_state: AppState<E, T, S, C>,
app_state: AppState<E, T, S, C, P>,
mut sub_block: Receiver<Block>,
) -> std::io::Result<()>
where
E: Executor,
T: TransactionPool,
S: Storage,
C: Crypto,
P: PeerCount,
{
let mut app_state_clone = app_state.clone();
let fut = async move {
Expand Down Expand Up @@ -386,7 +391,7 @@ where
.data(web::JsonConfig::default().limit(c_payload_size)) // <- limit size of the payload
.route(
web::post()
.to_async(rpc_handle::<E, T, S, C>),
.to_async(rpc_handle::<E, T, S, C, P>),
),
)
})
Expand Down
25 changes: 17 additions & 8 deletions components/jsonrpc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use numext_fixed_uint::U256;
use core_context::{Context, ORIGIN};
use core_crypto::{Crypto, CryptoTransform};
use core_merkle::{self, Merkle, ProofNode};
use core_runtime::network::PeerCount;
use core_runtime::{ExecutionContext, Executor, Storage, TransactionOrigin, TransactionPool};
use core_serialization::AsyncCodec;
use core_types::{Address, Block, BloomRef, Hash, Receipt, SignedTransaction};
Expand Down Expand Up @@ -147,16 +148,17 @@ impl FilterDatabase {
}
}

pub struct AppState<E, T, S, C> {
pub struct AppState<E, T, S, C, P> {
pub filterdb: Arc<RwLock<FilterDatabase>>,

executor: Arc<E>,
transaction_pool: Arc<T>,
storage: Arc<S>,
crypto: Arc<C>,
peer_count: Arc<P>,
}

impl<E, T, S, C> Clone for AppState<E, T, S, C> {
impl<E, T, S, C, P> Clone for AppState<E, T, S, C, P> {
fn clone(&self) -> Self {
Self {
filterdb: Arc::<RwLock<FilterDatabase>>::clone(&self.filterdb),
Expand All @@ -165,21 +167,25 @@ impl<E, T, S, C> Clone for AppState<E, T, S, C> {
transaction_pool: Arc::<T>::clone(&self.transaction_pool),
storage: Arc::<S>::clone(&self.storage),
crypto: Arc::<C>::clone(&self.crypto),
peer_count: Arc::<P>::clone(&self.peer_count),
}
}
}

impl<E, T, S, C> AppState<E, T, S, C>
impl<E, T, S, C, P> AppState<E, T, S, C, P>
where
E: Executor,
T: TransactionPool,
S: Storage,
C: Crypto,
P: PeerCount,
{
pub fn new(
executor: Arc<E>,
transaction_pool: Arc<T>,
storage: Arc<S>,
crypto: Arc<C>,
peer_count: Arc<P>,
) -> Self {
Self {
filterdb: Arc::new(RwLock::new(FilterDatabase::default())),
Expand All @@ -188,17 +194,19 @@ where
transaction_pool,
storage,
crypto,
peer_count,
}
}
}

/// Help functions for rpc APIs.
impl<E, T, S, C> AppState<E, T, S, C>
impl<E, T, S, C, P> AppState<E, T, S, C, P>
where
E: Executor,
T: TransactionPool,
S: Storage,
C: Crypto,
P: PeerCount,
{
pub async fn get_block(&self, number: String) -> RpcResult<Block> {
let h = self.get_height(number).await?;
Expand Down Expand Up @@ -372,12 +380,13 @@ where

/// Async rpc APIs.
/// See ./server.rs::rpc_select to learn about meanings of these APIs.
impl<E, T, S, C> AppState<E, T, S, C>
impl<E, T, S, C, P> AppState<E, T, S, C, P>
where
E: Executor,
T: TransactionPool,
S: Storage,
C: Crypto,
P: PeerCount,
{
pub async fn block_number(&self) -> RpcResult<u64> {
let b = self
Expand Down Expand Up @@ -719,8 +728,7 @@ where
}

pub async fn peer_count(&self) -> RpcResult<u32> {
// TODO. Can't implement at now
Ok(42)
Ok(self.peer_count.peer_count() as u32)
}

pub async fn send_raw_transaction(&self, signed_data: Vec<u8>) -> RpcResult<cita::TxResponse> {
Expand Down Expand Up @@ -771,12 +779,13 @@ where
}

/// A set of functions for FilterDataBase.
impl<E, T, S, C> AppState<E, T, S, C>
impl<E, T, S, C, P> AppState<E, T, S, C, P>
where
E: Executor,
T: TransactionPool,
S: Storage,
C: Crypto,
P: PeerCount,
{
/// Pass a block into FilterDatabase.
pub async fn recv_block(&mut self, block: Block) -> RpcResult<()> {
Expand Down
2 changes: 1 addition & 1 deletion components/jsonrpc/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def call(command):

def test_peer_count():
r = call(f'{prefix} peerCount')
assert r == '0x2a'
assert r == '0x1'


def test_block_number():
Expand Down
2 changes: 1 addition & 1 deletion components/jsonrpc/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

def test_peer_count():
r = client.peer_count()
assert r == 42
assert r == 1


def test_block_number():
Expand Down
34 changes: 17 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn start(cfg: &Config) {

let partial_network = PartialService::new(network_config).unwrap();
let outbound = partial_network.outbound();
let _peer_count = partial_network.peer_count();
let peer_count = Arc::new(partial_network.peer_count());

// new tx pool
let tx_pool = Arc::new(HashTransactionPool::new(
Expand All @@ -118,22 +118,6 @@ fn start(cfg: &Config) {
block.header.height,
));

// run json rpc
let mut jrpc_config = components_jsonrpc::Config::default();
jrpc_config.listen = cfg.rpc.address.clone();
jrpc_config.workers = if cfg.rpc.workers != 0 {
cfg.rpc.workers as usize
} else {
cmp::min(2, num_cpus::get())
};
jrpc_config.payload_size = cfg.rpc.payload_size;
let jrpc_state = components_jsonrpc::AppState::new(
Arc::clone(&executor),
Arc::clone(&tx_pool),
Arc::clone(&storage),
Arc::clone(&secp),
);

// new consensus
let privkey = PrivateKey::from_bytes(&hex::decode(cfg.privkey.clone()).unwrap()).unwrap();

Expand Down Expand Up @@ -207,6 +191,22 @@ fn start(cfg: &Config) {
.subscribe::<Block>(PUBSUB_BROADCAST_BLOCK.to_owned())
.unwrap();

// run json rpc
let mut jrpc_config = components_jsonrpc::Config::default();
jrpc_config.listen = cfg.rpc.address.clone();
jrpc_config.workers = if cfg.rpc.workers != 0 {
cfg.rpc.workers as usize
} else {
cmp::min(2, num_cpus::get())
};
jrpc_config.payload_size = cfg.rpc.payload_size;
let jrpc_state = components_jsonrpc::AppState::new(
Arc::clone(&executor),
Arc::clone(&tx_pool),
Arc::clone(&storage),
Arc::clone(&secp),
Arc::clone(&peer_count),
);
if let Err(e) = components_jsonrpc::listen(jrpc_config, jrpc_state, sub_block) {
log::error!("Failed to start jrpc server: {}", e);
};
Expand Down

0 comments on commit 736ae8c

Please sign in to comment.