Skip to content

Commit

Permalink
Add JSON-RPC client support
Browse files Browse the repository at this point in the history
  • Loading branch information
efoerster committed May 18, 2019
1 parent 9ecd325 commit 8ecaa24
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 105 deletions.
85 changes: 85 additions & 0 deletions jsonrpc/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use crate::types::*;
use futures::channel::oneshot;
use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::prelude::*;
use serde::Serialize;
use serde_json::json;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;

pub type Result<T> = std::result::Result<T, Error>;

pub type FutureResult<'a, T> = BoxFuture<'a, Result<T>>;

pub trait ResponseHandler {
fn handle(&self, response: Response) -> BoxFuture<'_, ()>;
}

pub struct Client<O> {
output: Arc<Mutex<O>>,
request_id: AtomicI32,
queue: Mutex<HashMap<Id, oneshot::Sender<Result<serde_json::Value>>>>,
}

impl<O> Client<O>
where
O: Sink<String> + Unpin + Send,
{
pub fn new(output: Arc<Mutex<O>>) -> Self {
Client {
output,
request_id: AtomicI32::new(0),
queue: Mutex::new(HashMap::new()),
}
}

pub async fn send_request<T: Serialize>(
&self,
method: String,
params: T,
) -> Result<serde_json::Value> {
let id = self.request_id.fetch_add(1, Ordering::SeqCst);
let request = Request::new(method, json!(params), id);

let (sender, receiver) = oneshot::channel();
{
let mut queue = await!(self.queue.lock());
queue.insert(request.id, sender);
}

await!(self.send(Message::Request(request)));
await!(receiver).unwrap()
}

pub async fn send_notification<T: Serialize>(&self, method: String, params: T) {
let notification = Notification::new(method, json!(params));
await!(self.send(Message::Notification(notification)));
}

async fn send(&self, message: Message) {
let json = serde_json::to_string(&message).unwrap();
let mut output = await!(self.output.lock());
await!(output.send(json));
}
}

impl<O> ResponseHandler for Client<O>
where
O: Sink<String> + Unpin + Send,
{
fn handle(&self, response: Response) -> BoxFuture<'_, ()> {
let task = async move {
let id = response.id.expect("Expected response with id");
let mut queue = await!(self.queue.lock());
let sender = queue.remove(&id).expect("Unexpected response received");

let error = response.error.clone();
let result = response.result.ok_or_else(|| error.unwrap());
sender.send(result).unwrap();
};

task.boxed()
}
}
31 changes: 23 additions & 8 deletions jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,48 @@
#![feature(await_macro, async_await)]

mod server;
pub mod client;
pub mod server;
mod types;

pub use self::server::*;
pub use self::types::*;
pub use self::{
client::{Client, ResponseHandler},
server::{handle_notification, handle_request, Server},
types::*,
};

use futures::executor::ThreadPool;
use futures::lock::Mutex;
use futures::prelude::*;
use futures::task::*;
use std::sync::Arc;

pub struct MessageHandler<S, I, O> {
pub struct MessageHandler<S, H, I, O> {
server: Arc<S>,
response_handler: Arc<H>,
input: I,
output: Arc<Mutex<O>>,
pool: ThreadPool,
}

impl<S, I, O> MessageHandler<S, I, O>
impl<S, H, I, O> MessageHandler<S, H, I, O>
where
S: Server + Send + Sync + 'static,
H: ResponseHandler + Send + Sync + 'static,
I: Stream<Item = std::io::Result<String>> + Unpin,
O: Sink<String> + Unpin + Send + 'static,
{
pub fn new(server: S, input: I, output: O, pool: ThreadPool) -> Self {
pub fn new(
server: S,
response_handler: Arc<H>,
input: I,
output: Arc<Mutex<O>>,
pool: ThreadPool,
) -> Self {
MessageHandler {
server: Arc::new(server),
response_handler,
input,
output: Arc::new(Mutex::new(output)),
output,
pool,
}
}
Expand Down Expand Up @@ -58,7 +71,9 @@ where
Ok(Message::Notification(notification)) => {
self.server.handle_notification(notification);
}
Ok(Message::Response(response)) => unimplemented!("{:?}", response),
Ok(Message::Response(response)) => {
await!(self.response_handler.handle(response));
}
Err(why) => {
let response = Response::error(why, None);
let json = serde_json::to_string(&response).unwrap();
Expand Down
2 changes: 2 additions & 0 deletions jsonrpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::json;

pub type Result<T> = std::result::Result<T, String>;

pub trait Server {
fn handle_request(&self, request: Request) -> BoxFuture<'_, Response>;

Expand Down
33 changes: 27 additions & 6 deletions jsonrpc/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use serde_repr::*;

pub const PROTOCOL_VERSION: &str = "2.0";

pub type Id = i32;

#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize_repr, Serialize_repr)]
#[repr(i32)]
pub enum ErrorCode {
Expand Down Expand Up @@ -30,7 +32,18 @@ pub struct Request {
pub jsonrpc: String,
pub method: String,
pub params: serde_json::Value,
pub id: i32,
pub id: Id,
}

impl Request {
pub fn new(method: String, params: serde_json::Value, id: Id) -> Self {
Request {
jsonrpc: PROTOCOL_VERSION.to_owned(),
method,
params,
id,
}
}
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
Expand All @@ -43,11 +56,11 @@ pub struct Response {
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<Error>,

pub id: Option<i32>,
pub id: Option<Id>,
}

impl Response {
pub fn result(result: serde_json::Value, id: i32) -> Self {
pub fn result(result: serde_json::Value, id: Id) -> Self {
Response {
jsonrpc: PROTOCOL_VERSION.to_owned(),
result: Some(result),
Expand All @@ -56,7 +69,7 @@ impl Response {
}
}

pub fn error(error: Error, id: Option<i32>) -> Self {
pub fn error(error: Error, id: Option<Id>) -> Self {
Response {
jsonrpc: PROTOCOL_VERSION.to_owned(),
result: None,
Expand All @@ -73,12 +86,20 @@ pub struct Notification {
pub params: serde_json::Value,
}

impl Notification {
pub fn new(method: String, params: serde_json::Value) -> Self {
Notification {
jsonrpc: PROTOCOL_VERSION.to_owned(),
method,
params,
}
}
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(untagged)]
pub enum Message {
Request(Request),
Notification(Notification),
Response(Response),
}

pub type Result<T> = std::result::Result<T, String>;
Loading

0 comments on commit 8ecaa24

Please sign in to comment.