Skip to content

Commit

Permalink
Refactor JSON-RPC layer
Browse files Browse the repository at this point in the history
  • Loading branch information
efoerster committed May 9, 2019
1 parent 9c99f45 commit db96b9e
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 77 deletions.
64 changes: 64 additions & 0 deletions jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,67 @@ mod types;

pub use self::server::*;
pub use self::types::*;

use futures::executor::ThreadPool;
use futures::lock::Mutex;
use futures::prelude::*;
use futures::task::*;
use std::sync::Arc;

pub struct MessageHandler<S, I, O> {
server: Arc<S>,
input: I,
output: Arc<Mutex<O>>,
pool: ThreadPool,
}

impl<S, I, O> MessageHandler<S, I, O>
where
S: Server + Send + Sync + 'static,
I: Stream<Item = std::io::Result<String>> + Unpin,
O: Sink<String> + Unpin + Send + 'static,
{
pub fn new(server: S, input: I, output: O, pool: ThreadPool) -> Self {
MessageHandler {
server: Arc::new(server),
input,
output: Arc::new(Mutex::new(output)),
pool,
}
}

pub async fn listen(&mut self) {
while let Some(json) = await!(self.input.next()) {
let message = serde_json::from_str(&json.expect("")).map_err(|_| Error {
code: ErrorCode::ParseError,
message: "Could not parse the input".to_owned(),
data: serde_json::Value::Null,
});

match message {
Ok(Message::Request(request)) => {
let server = Arc::clone(&self.server);
let output = Arc::clone(&self.output);
let handler = async move {
let response = await!(server.handle_request(request));
let json = serde_json::to_string(&response).unwrap();
let mut output = await!(output.lock());
await!(output.send(json));
};

self.pool.spawn(handler).unwrap();
}
Ok(Message::Notification(notification)) => {
self.server.handle_notification(notification);
}
Ok(Message::Response(_)) => unimplemented!(),
Err(why) => {
let response = Response::new(serde_json::Value::Null, Some(why), None);
let json = serde_json::to_string(&response).unwrap();
let mut output = await!(self.output.lock());
await!(output.send(json));
}
}
}
}
}
7 changes: 7 additions & 0 deletions jsonrpc/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use crate::types::*;
use futures::future::BoxFuture;
use futures::prelude::*;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::json;

pub trait Server {
fn handle_request(&self, request: Request) -> BoxFuture<'_, Response>;

fn handle_notification(&self, notification: Notification);
}

const DESERIALIZE_OBJECT_ERROR: &str = "Could not deserialize parameter object";

pub async fn handle_request<'a, H, F, I, O>(request: Request, handler: H) -> Response
Expand Down
39 changes: 22 additions & 17 deletions jsonrpc_derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,38 @@ pub fn jsonrpc_server(
}

let self_ty = &impl_.self_ty;
let items = &impl_.items;
let result = quote! {
impl #self_ty {
#(#items)*

pub async fn handle_request(&self, request: jsonrpc::Request) -> jsonrpc::Response {
match request.method.as_str() {
#(#requests),*,
_ => {
let error = jsonrpc::Error {
code: jsonrpc::ErrorCode::MethodNotFound,
message: String::from("Method not found"),
data: serde_json::Value::Null,
};

jsonrpc::Response::new(serde_json::Value::Null, Some(error), Some(request.id))
impl jsonrpc::Server for #self_ty {
fn handle_request(&self, request: jsonrpc::Request)
-> futures::future::BoxFuture<'_, jsonrpc::Response> {
use futures::prelude::*;
let handler = async move {
match request.method.as_str() {
#(#requests),*,
_ => {
let error = jsonrpc::Error {
code: jsonrpc::ErrorCode::MethodNotFound,
message: String::from("Method not found"),
data: serde_json::Value::Null,
};

jsonrpc::Response::new(serde_json::Value::Null, Some(error), Some(request.id))
}
}
}
};

handler.boxed()
}

pub fn handle_notification(&self, notification: jsonrpc::Notification) {
fn handle_notification(&self, notification: jsonrpc::Notification) {
match notification.method.as_str() {
#(#notifications),*,
_ => log::warn!("{}: {}", "Method not found", notification.method),
}
}
}

#impl_
};

result.into()
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![recursion_limit = "128"]

pub mod build;
pub mod codec;
pub mod completion;
pub mod definition;
pub mod feature;
Expand All @@ -10,7 +11,6 @@ pub mod formatting;
pub mod highlight;
pub mod hover;
pub mod link;
pub mod lsp;
pub mod metadata;
pub mod reference;
pub mod rename;
Expand Down
56 changes: 0 additions & 56 deletions src/lsp/mod.rs

This file was deleted.

12 changes: 9 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#![feature(await_macro, async_await)]

use clap::*;
use futures::compat::*;
use futures::executor::*;
use futures::prelude::*;
use texlab::lsp;
use jsonrpc::MessageHandler;
use texlab::codec::LspCodec;
use texlab::server::LatexLspServer;
use tokio::codec::FramedRead;
use tokio_codec::FramedWrite;
use tokio_stdin_stdout;

fn main() {
Expand Down Expand Up @@ -41,6 +45,8 @@ async fn run(pool: ThreadPool) {
let server = LatexLspServer::new();
let stdin = tokio_stdin_stdout::stdin(0);
let stdout = tokio_stdin_stdout::stdout(0);

await!(lsp::listen(server, stdin, stdout, pool));
let input = FramedRead::new(stdin, LspCodec).compat();
let output = FramedWrite::new(stdout, LspCodec).sink_compat();
let mut handler = MessageHandler::new(server, input, output, pool);
await!(handler.listen());
}

0 comments on commit db96b9e

Please sign in to comment.