diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index ac025fb26..760047fa3 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -5,3 +5,67 @@ mod types; pub use self::server::*; pub use self::types::*; + +use futures::executor::ThreadPool; +use futures::lock::Mutex; +use futures::prelude::*; +use futures::task::*; +use std::sync::Arc; + +pub struct MessageHandler { + server: Arc, + input: I, + output: Arc>, + pool: ThreadPool, +} + +impl MessageHandler +where + S: Server + Send + Sync + 'static, + I: Stream> + Unpin, + O: Sink + Unpin + Send + 'static, +{ + pub fn new(server: S, input: I, output: O, pool: ThreadPool) -> Self { + MessageHandler { + server: Arc::new(server), + input, + output: Arc::new(Mutex::new(output)), + pool, + } + } + + pub async fn listen(&mut self) { + while let Some(json) = await!(self.input.next()) { + let message = serde_json::from_str(&json.expect("")).map_err(|_| Error { + code: ErrorCode::ParseError, + message: "Could not parse the input".to_owned(), + data: serde_json::Value::Null, + }); + + match message { + Ok(Message::Request(request)) => { + let server = Arc::clone(&self.server); + let output = Arc::clone(&self.output); + let handler = async move { + let response = await!(server.handle_request(request)); + let json = serde_json::to_string(&response).unwrap(); + let mut output = await!(output.lock()); + await!(output.send(json)); + }; + + self.pool.spawn(handler).unwrap(); + } + Ok(Message::Notification(notification)) => { + self.server.handle_notification(notification); + } + Ok(Message::Response(_)) => unimplemented!(), + Err(why) => { + let response = Response::new(serde_json::Value::Null, Some(why), None); + let json = serde_json::to_string(&response).unwrap(); + let mut output = await!(self.output.lock()); + await!(output.send(json)); + } + } + } + } +} diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs index a6e8fa774..c19f81e34 100644 --- a/jsonrpc/src/server.rs +++ b/jsonrpc/src/server.rs @@ -1,9 +1,16 @@ use crate::types::*; +use futures::future::BoxFuture; use futures::prelude::*; use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::json; +pub trait Server { + fn handle_request(&self, request: Request) -> BoxFuture<'_, Response>; + + fn handle_notification(&self, notification: Notification); +} + const DESERIALIZE_OBJECT_ERROR: &str = "Could not deserialize parameter object"; pub async fn handle_request<'a, H, F, I, O>(request: Request, handler: H) -> Response diff --git a/jsonrpc_derive/src/lib.rs b/jsonrpc_derive/src/lib.rs index c12fdcf6b..b2ee0f869 100644 --- a/jsonrpc_derive/src/lib.rs +++ b/jsonrpc_derive/src/lib.rs @@ -43,33 +43,38 @@ pub fn jsonrpc_server( } let self_ty = &impl_.self_ty; - let items = &impl_.items; let result = quote! { - impl #self_ty { - #(#items)* - - pub async fn handle_request(&self, request: jsonrpc::Request) -> jsonrpc::Response { - match request.method.as_str() { - #(#requests),*, - _ => { - let error = jsonrpc::Error { - code: jsonrpc::ErrorCode::MethodNotFound, - message: String::from("Method not found"), - data: serde_json::Value::Null, - }; - - jsonrpc::Response::new(serde_json::Value::Null, Some(error), Some(request.id)) + impl jsonrpc::Server for #self_ty { + fn handle_request(&self, request: jsonrpc::Request) + -> futures::future::BoxFuture<'_, jsonrpc::Response> { + use futures::prelude::*; + let handler = async move { + match request.method.as_str() { + #(#requests),*, + _ => { + let error = jsonrpc::Error { + code: jsonrpc::ErrorCode::MethodNotFound, + message: String::from("Method not found"), + data: serde_json::Value::Null, + }; + + jsonrpc::Response::new(serde_json::Value::Null, Some(error), Some(request.id)) + } } - } + }; + + handler.boxed() } - pub fn handle_notification(&self, notification: jsonrpc::Notification) { + fn handle_notification(&self, notification: jsonrpc::Notification) { match notification.method.as_str() { #(#notifications),*, _ => log::warn!("{}: {}", "Method not found", notification.method), } } } + + #impl_ }; result.into() diff --git a/src/lsp/codec.rs b/src/codec.rs similarity index 100% rename from src/lsp/codec.rs rename to src/codec.rs diff --git a/src/lib.rs b/src/lib.rs index fbf759ca2..dd5a09557 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ #![recursion_limit = "128"] pub mod build; +pub mod codec; pub mod completion; pub mod definition; pub mod feature; @@ -10,7 +11,6 @@ pub mod formatting; pub mod highlight; pub mod hover; pub mod link; -pub mod lsp; pub mod metadata; pub mod reference; pub mod rename; diff --git a/src/lsp/mod.rs b/src/lsp/mod.rs deleted file mode 100644 index cd2fde24b..000000000 --- a/src/lsp/mod.rs +++ /dev/null @@ -1,56 +0,0 @@ -mod codec; - -use crate::server::LatexLspServer; -use codec::LspCodec; -use futures::compat::*; -use futures::executor::ThreadPool; -use futures::lock::Mutex; -use futures::prelude::*; -use futures::task::*; -use jsonrpc::*; -use std::sync::Arc; -use tokio::codec::{FramedRead, FramedWrite}; -use tokio::prelude::{AsyncRead, AsyncWrite}; - -pub async fn listen(server: LatexLspServer, input: I, output: O, mut pool: ThreadPool) -where - I: AsyncRead + Send + Sync + 'static, - O: AsyncWrite + Send + Sync + 'static, -{ - let server = Arc::new(server); - let mut reader = FramedRead::new(input, LspCodec).compat(); - let writer = Arc::new(Mutex::new(FramedWrite::new(output, LspCodec).sink_compat())); - - while let Some(content) = await!(reader.next()) { - let message = - serde_json::from_str(&content.expect("Invalid message format")).map_err(|_| Error { - code: ErrorCode::ParseError, - message: "Could not parse the input".to_owned(), - data: serde_json::Value::Null, - }); - - match message { - Ok(Message::Request(request)) => { - let server = Arc::clone(&server); - let writer = Arc::clone(&writer); - let task = async move { - let response = await!(server.handle_request(request)); - let mut writer = await!(writer.lock()); - await!(writer.send(serde_json::to_string(&response).unwrap())) - .expect("Cannot write into output"); - }; - pool.spawn(task).unwrap(); - } - Ok(Message::Notification(notification)) => { - server.handle_notification(notification); - } - Ok(Message::Response(_)) => unimplemented!(), - Err(why) => { - let response = Response::new(serde_json::Value::Null, Some(why), None); - let mut writer = await!(writer.lock()); - await!(writer.send(serde_json::to_string(&response).unwrap())) - .expect("Cannot write into output"); - } - } - } -} diff --git a/src/main.rs b/src/main.rs index 209486de9..de4daac68 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,14 @@ #![feature(await_macro, async_await)] use clap::*; +use futures::compat::*; use futures::executor::*; use futures::prelude::*; -use texlab::lsp; +use jsonrpc::MessageHandler; +use texlab::codec::LspCodec; use texlab::server::LatexLspServer; +use tokio::codec::FramedRead; +use tokio_codec::FramedWrite; use tokio_stdin_stdout; fn main() { @@ -41,6 +45,8 @@ async fn run(pool: ThreadPool) { let server = LatexLspServer::new(); let stdin = tokio_stdin_stdout::stdin(0); let stdout = tokio_stdin_stdout::stdout(0); - - await!(lsp::listen(server, stdin, stdout, pool)); + let input = FramedRead::new(stdin, LspCodec).compat(); + let output = FramedWrite::new(stdout, LspCodec).sink_compat(); + let mut handler = MessageHandler::new(server, input, output, pool); + await!(handler.listen()); }