diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs new file mode 100644 index 000000000..907dc4a01 --- /dev/null +++ b/jsonrpc/src/client.rs @@ -0,0 +1,85 @@ +use crate::types::*; +use futures::channel::oneshot; +use futures::future::BoxFuture; +use futures::lock::Mutex; +use futures::prelude::*; +use serde::Serialize; +use serde_json::json; +use std::collections::HashMap; +use std::sync::atomic::{AtomicI32, Ordering}; +use std::sync::Arc; + +pub type Result = std::result::Result; + +pub type FutureResult<'a, T> = BoxFuture<'a, Result>; + +pub trait ResponseHandler { + fn handle(&self, response: Response) -> BoxFuture<'_, ()>; +} + +pub struct Client { + output: Arc>, + request_id: AtomicI32, + queue: Mutex>>>, +} + +impl Client +where + O: Sink + Unpin + Send, +{ + pub fn new(output: Arc>) -> Self { + Client { + output, + request_id: AtomicI32::new(0), + queue: Mutex::new(HashMap::new()), + } + } + + pub async fn send_request( + &self, + method: String, + params: T, + ) -> Result { + let id = self.request_id.fetch_add(1, Ordering::SeqCst); + let request = Request::new(method, json!(params), id); + + let (sender, receiver) = oneshot::channel(); + { + let mut queue = await!(self.queue.lock()); + queue.insert(request.id, sender); + } + + await!(self.send(Message::Request(request))); + await!(receiver).unwrap() + } + + pub async fn send_notification(&self, method: String, params: T) { + let notification = Notification::new(method, json!(params)); + await!(self.send(Message::Notification(notification))); + } + + async fn send(&self, message: Message) { + let json = serde_json::to_string(&message).unwrap(); + let mut output = await!(self.output.lock()); + await!(output.send(json)); + } +} + +impl ResponseHandler for Client +where + O: Sink + Unpin + Send, +{ + fn handle(&self, response: Response) -> BoxFuture<'_, ()> { + let task = async move { + let id = response.id.expect("Expected response with id"); + let mut queue = await!(self.queue.lock()); + let sender = queue.remove(&id).expect("Unexpected response received"); + + let error = response.error.clone(); + let result = response.result.ok_or_else(|| error.unwrap()); + sender.send(result).unwrap(); + }; + + task.boxed() + } +} diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index b8fa0f816..12d94adf7 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -1,10 +1,14 @@ #![feature(await_macro, async_await)] -mod server; +pub mod client; +pub mod server; mod types; -pub use self::server::*; -pub use self::types::*; +pub use self::{ + client::{Client, ResponseHandler}, + server::{handle_notification, handle_request, Server}, + types::*, +}; use futures::executor::ThreadPool; use futures::lock::Mutex; @@ -12,24 +16,33 @@ use futures::prelude::*; use futures::task::*; use std::sync::Arc; -pub struct MessageHandler { +pub struct MessageHandler { server: Arc, + response_handler: Arc, input: I, output: Arc>, pool: ThreadPool, } -impl MessageHandler +impl MessageHandler where S: Server + Send + Sync + 'static, + H: ResponseHandler + Send + Sync + 'static, I: Stream> + Unpin, O: Sink + Unpin + Send + 'static, { - pub fn new(server: S, input: I, output: O, pool: ThreadPool) -> Self { + pub fn new( + server: S, + response_handler: Arc, + input: I, + output: Arc>, + pool: ThreadPool, + ) -> Self { MessageHandler { server: Arc::new(server), + response_handler, input, - output: Arc::new(Mutex::new(output)), + output, pool, } } @@ -58,7 +71,9 @@ where Ok(Message::Notification(notification)) => { self.server.handle_notification(notification); } - Ok(Message::Response(response)) => unimplemented!("{:?}", response), + Ok(Message::Response(response)) => { + await!(self.response_handler.handle(response)); + } Err(why) => { let response = Response::error(why, None); let json = serde_json::to_string(&response).unwrap(); diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs index 0f97a677f..db09c1a34 100644 --- a/jsonrpc/src/server.rs +++ b/jsonrpc/src/server.rs @@ -5,6 +5,8 @@ use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::json; +pub type Result = std::result::Result; + pub trait Server { fn handle_request(&self, request: Request) -> BoxFuture<'_, Response>; diff --git a/jsonrpc/src/types.rs b/jsonrpc/src/types.rs index d2e20592a..10803d3df 100644 --- a/jsonrpc/src/types.rs +++ b/jsonrpc/src/types.rs @@ -3,6 +3,8 @@ use serde_repr::*; pub const PROTOCOL_VERSION: &str = "2.0"; +pub type Id = i32; + #[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize_repr, Serialize_repr)] #[repr(i32)] pub enum ErrorCode { @@ -30,7 +32,18 @@ pub struct Request { pub jsonrpc: String, pub method: String, pub params: serde_json::Value, - pub id: i32, + pub id: Id, +} + +impl Request { + pub fn new(method: String, params: serde_json::Value, id: Id) -> Self { + Request { + jsonrpc: PROTOCOL_VERSION.to_owned(), + method, + params, + id, + } + } } #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] @@ -43,11 +56,11 @@ pub struct Response { #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, - pub id: Option, + pub id: Option, } impl Response { - pub fn result(result: serde_json::Value, id: i32) -> Self { + pub fn result(result: serde_json::Value, id: Id) -> Self { Response { jsonrpc: PROTOCOL_VERSION.to_owned(), result: Some(result), @@ -56,7 +69,7 @@ impl Response { } } - pub fn error(error: Error, id: Option) -> Self { + pub fn error(error: Error, id: Option) -> Self { Response { jsonrpc: PROTOCOL_VERSION.to_owned(), result: None, @@ -73,6 +86,16 @@ pub struct Notification { pub params: serde_json::Value, } +impl Notification { + pub fn new(method: String, params: serde_json::Value) -> Self { + Notification { + jsonrpc: PROTOCOL_VERSION.to_owned(), + method, + params, + } + } +} + #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] #[serde(untagged)] pub enum Message { @@ -80,5 +103,3 @@ pub enum Message { Notification(Notification), Response(Response), } - -pub type Result = std::result::Result; diff --git a/jsonrpc_derive/src/lib.rs b/jsonrpc_derive/src/lib.rs index 5d5c81bea..4c0412fe5 100644 --- a/jsonrpc_derive/src/lib.rs +++ b/jsonrpc_derive/src/lib.rs @@ -4,47 +4,75 @@ extern crate proc_macro; use quote::quote; -use syn; +use syn::export::TokenStream2; +use syn::*; macro_rules! unwrap { ($input:expr, $arm:pat => $value:expr) => {{ match $input { $arm => $value, - _ => unreachable!(), + _ => panic!(), } }}; } +enum MethodKind { + Request, + Notification, +} + +struct MethodMeta { + pub name: String, + pub kind: MethodKind, +} + +impl MethodMeta { + pub fn parse(attr: &Attribute) -> Self { + let meta = attr.parse_meta().unwrap(); + if meta.name() != "jsonrpc_method" { + panic!("Expected jsonrpc_method attribute"); + } + + let nested = unwrap!(meta, Meta::List(x) => x.nested); + let name = unwrap!(&nested[0], NestedMeta::Literal(Lit::Str(x)) => x.value()); + let kind = { + let lit = unwrap!(&nested[1], NestedMeta::Meta(Meta::NameValue(x)) => &x.lit); + let kind = unwrap!(lit, Lit::Str(x) => x.value()); + match kind.as_str() { + "request" => MethodKind::Request, + "notification" => MethodKind::Notification, + _ => panic!( + "Invalid method kind. Valid options are \"request\" and \"notification\"" + ), + } + }; + + Self { name, kind } + } +} + #[proc_macro_attribute] -pub fn jsonrpc_server( +pub fn jsonrpc_method( _attr: proc_macro::TokenStream, item: proc_macro::TokenStream, ) -> proc_macro::TokenStream { - let impl_: syn::ItemImpl = syn::parse_macro_input!(item); - let mut requests = Vec::new(); - let mut notifications = Vec::new(); - let methods = &get_methods(&impl_); - for method in methods { - let ident = &method.sig.ident; - let attribute = method.attrs.first().unwrap().clone(); - - let meta = attribute.parse_meta().unwrap(); - let meta_list = unwrap!(meta, syn::Meta::List(x) => x); - let meta_nested = meta_list.nested.first().unwrap(); - let meta_lit = unwrap!(meta_nested.value(), syn::NestedMeta::Literal(x) => x); - let name = unwrap!(meta_lit, syn::Lit::Str(x) => x.value()); - let name_str = name.as_str(); - - if is_request_method(&method) { - requests.push(quote!(#name_str => await!(self.#ident(request)))); - } else { - notifications.push(quote!(#name_str => self.#ident(notification))); - } - } + item +} +#[proc_macro_attribute] +pub fn jsonrpc_server( + _attr: proc_macro::TokenStream, + item: proc_macro::TokenStream, +) -> proc_macro::TokenStream { + let impl_: ItemImpl = parse_macro_input!(item); + let generics = &impl_.generics; let self_ty = &impl_.self_ty; - let result = quote! { - impl jsonrpc::Server for #self_ty { + let (requests, notifications) = generate_server_skeletons(&impl_.items); + + let tokens = quote! { + #impl_ + + impl #generics jsonrpc::Server for #self_ty { fn handle_request(&self, request: jsonrpc::Request) -> futures::future::BoxFuture<'_, jsonrpc::Response> { use futures::prelude::*; @@ -73,63 +101,147 @@ pub fn jsonrpc_server( } } } - - #impl_ }; - result.into() + tokens.into() } #[proc_macro_attribute] -pub fn jsonrpc_method( - _attr: proc_macro::TokenStream, +pub fn jsonrpc_client( + attr: proc_macro::TokenStream, item: proc_macro::TokenStream, ) -> proc_macro::TokenStream { - let method: syn::ImplItemMethod = syn::parse_macro_input!(item); - let name = &method.sig.ident; - let block = &method.block; - let param_decl = match &method.sig.decl.inputs[1] { - syn::FnArg::Captured(arg) => arg, - _ => panic!("Could not extract parameter type"), - }; - let param_name = ¶m_decl.pat; - let param_type = ¶m_decl.ty; - let return_ty = &method.sig.decl.output; - - let result = if is_request_method(&method) { - quote! { - pub async fn #name(&self, request: jsonrpc::Request) -> jsonrpc::Response { - let handler = async move |#param_name: #param_type| #return_ty #block; - await!(jsonrpc::handle_request(request, handler)) + let trait_: ItemTrait = parse_macro_input!(item); + let trait_ident = &trait_.ident; + let stubs = generate_client_stubs(&trait_.items); + let attr: AttributeArgs = parse_macro_input!(attr); + let struct_ident = unwrap!(attr.first().unwrap(), NestedMeta::Meta(Meta::Word(x)) => x); + + let tokens = quote! { + #trait_ + + pub struct #struct_ident { + client: std::sync::Arc> + } + + impl #struct_ident + where + O: futures::Sink + Unpin + Send, + { + pub fn new(output: std::sync::Arc>) -> Self { + Self { + client: std::sync::Arc::new(jsonrpc::Client::new(output)), + } } } - } else { - quote! { - pub fn #name(&self, notification: jsonrpc::Notification) { - let handler = move |#param_name: #param_type| #block; - jsonrpc::handle_notification(notification, handler); + + impl #trait_ident for #struct_ident + where + O: futures::Sink + Unpin + Send, + { + #(#stubs)* + } + + impl jsonrpc::ResponseHandler for #struct_ident + where + O: futures::Sink + Unpin + Send, + { + fn handle(&self, response: jsonrpc::Response) -> futures::future::BoxFuture<'_, ()> { + self.client.handle(response) } } }; - result.into() + tokens.into() } -fn get_methods(impl_: &syn::ItemImpl) -> Vec<&syn::ImplItemMethod> { - let mut methods = Vec::new(); - for item in &impl_.items { - let method = unwrap!(item, syn::ImplItem::Method(x) => x); - if !method.attrs.is_empty() { - methods.push(method); +fn generate_server_skeletons(items: &Vec) -> (Vec, Vec) { + let mut requests = Vec::new(); + let mut notifications = Vec::new(); + for item in items { + let method = unwrap!(item, ImplItem::Method(x) => x); + if method.attrs.is_empty() { + continue; + } + + let ident = &method.sig.ident; + let return_ty = &method.sig.decl.output; + let param_ty = unwrap!(&method.sig.decl.inputs[1], FnArg::Captured(x) => &x.ty); + let meta = MethodMeta::parse(method.attrs.first().unwrap()); + let name = &meta.name.as_str(); + + match meta.kind { + MethodKind::Request => { + requests.push(quote!( + #name => { + let handler = async move |param: #param_ty| #return_ty { + await!(self.#ident(param)) + }; + + await!(jsonrpc::handle_request(request, handler)) + } + )); + } + MethodKind::Notification => { + notifications.push(quote!( + #name => { + let handler = move |param: #param_ty| { + self.#ident(param); + }; + + jsonrpc::handle_notification(notification, handler); + } + )); + } } } - methods + (requests, notifications) } -fn is_request_method(method: &syn::ImplItemMethod) -> bool { - match method.sig.decl.output { - syn::ReturnType::Type(_, _) => true, - syn::ReturnType::Default => false, +fn generate_client_stubs(items: &Vec) -> Vec { + let mut stubs = Vec::new(); + for item in items { + let method = unwrap!(item, TraitItem::Method(x) => x); + let sig = &method.sig; + let param = unwrap!(&sig.decl.inputs[1], FnArg::Captured(x) => &x.pat); + let meta = MethodMeta::parse(method.attrs.first().unwrap()); + let name = &meta.name; + + let stub = match meta.kind { + MethodKind::Request => quote!( + #sig { + use futures::prelude::*; + + let client = std::sync::Arc::clone(&self.client); + let task = async move { + let result = await!(client.send_request(#name.to_owned(), #param))?; + serde_json::from_value(result).map_err(|_| jsonrpc::Error { + code: jsonrpc::ErrorCode::InvalidParams, + message: "Could not deserialize parameter object".to_owned(), + data: serde_json::Value::Null, + }) + }; + + task.boxed() + } + ), + MethodKind::Notification => quote!( + #sig { + use futures::prelude::*; + + let client = std::sync::Arc::clone(&self.client); + let task = async move { + await!(self.client.send_notification(#name.to_owned(), #param)); + }; + + task.boxed() + } + ), + }; + + stubs.push(stub); } + + stubs } diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 000000000..c5955e467 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,13 @@ +use futures::future::BoxFuture; +use jsonrpc::client::FutureResult; +use jsonrpc_derive::{jsonrpc_client, jsonrpc_method}; +use lsp_types::{ConfigurationParams, ShowMessageParams}; + +#[jsonrpc_client(LatexLspClient)] +pub trait LspClient { + #[jsonrpc_method("workspace/configuration", kind = "request")] + fn configuration(&self, params: ConfigurationParams) -> FutureResult<'_, serde_json::Value>; + + #[jsonrpc_method("window/showMessage", kind = "notification")] + fn show_message(&self, params: ShowMessageParams) -> BoxFuture<'_, ()>; +} diff --git a/src/lib.rs b/src/lib.rs index 36c5a81d4..309fa3703 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ #![recursion_limit = "128"] pub mod build; +pub mod client; pub mod codec; pub mod completion; pub mod data; diff --git a/src/main.rs b/src/main.rs index de4daac68..195a05476 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,8 +3,11 @@ use clap::*; use futures::compat::*; use futures::executor::*; +use futures::lock::Mutex; use futures::prelude::*; use jsonrpc::MessageHandler; +use std::sync::Arc; +use texlab::client::LatexLspClient; use texlab::codec::LspCodec; use texlab::server::LatexLspServer; use tokio::codec::FramedRead; @@ -42,11 +45,12 @@ fn main() { } async fn run(pool: ThreadPool) { - let server = LatexLspServer::new(); let stdin = tokio_stdin_stdout::stdin(0); let stdout = tokio_stdin_stdout::stdout(0); let input = FramedRead::new(stdin, LspCodec).compat(); - let output = FramedWrite::new(stdout, LspCodec).sink_compat(); - let mut handler = MessageHandler::new(server, input, output, pool); + let output = Arc::new(Mutex::new(FramedWrite::new(stdout, LspCodec).sink_compat())); + let client = Arc::new(LatexLspClient::new(Arc::clone(&output))); + let server = LatexLspServer::new(Arc::clone(&client)); + let mut handler = MessageHandler::new(server, client, input, output, pool); await!(handler.listen()); } diff --git a/src/server.rs b/src/server.rs index 28d222505..82ce8baa0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,3 +1,4 @@ +use crate::client::LspClient; use crate::completion::latex::data::types::LatexComponentDatabase; use crate::completion::CompletionProvider; use crate::definition::DefinitionProvider; @@ -10,26 +11,28 @@ use crate::reference::ReferenceProvider; use crate::rename::RenameProvider; use crate::request; use crate::workspace::WorkspaceManager; -use jsonrpc::Result; +use jsonrpc::server::Result; use jsonrpc_derive::{jsonrpc_method, jsonrpc_server}; use log::*; use lsp_types::*; use std::sync::Arc; use walkdir::WalkDir; -pub struct LatexLspServer { +pub struct LatexLspServer { + client: Arc, workspace_manager: WorkspaceManager, } #[jsonrpc_server] -impl LatexLspServer { - pub fn new() -> Self { +impl LatexLspServer { + pub fn new(client: Arc) -> Self { LatexLspServer { + client, workspace_manager: WorkspaceManager::new(), } } - #[jsonrpc_method("initialize")] + #[jsonrpc_method("initialize", kind = "request")] pub async fn initialize(&self, params: InitializeParams) -> Result { if let Some(Ok(path)) = params.root_uri.map(|x| x.to_file_path()) { for entry in WalkDir::new(path) @@ -88,26 +91,26 @@ impl LatexLspServer { Ok(InitializeResult { capabilities }) } - #[jsonrpc_method("initialized")] + #[jsonrpc_method("initialized", kind = "notification")] pub fn initialized(&self, params: InitializedParams) {} - #[jsonrpc_method("shutdown")] + #[jsonrpc_method("shutdown", kind = "request")] pub async fn shutdown(&self, params: ()) -> Result<()> { Ok(()) } - #[jsonrpc_method("exit")] + #[jsonrpc_method("exit", kind = "notification")] pub fn exit(&self, params: ()) {} - #[jsonrpc_method("workspace/didChangeWatchedFiles")] + #[jsonrpc_method("workspace/didChangeWatchedFiles", kind = "notification")] pub fn did_change_watched_files(&self, params: DidChangeWatchedFilesParams) {} - #[jsonrpc_method("textDocument/didOpen")] + #[jsonrpc_method("textDocument/didOpen", kind = "notification")] pub fn did_open(&self, params: DidOpenTextDocumentParams) { self.workspace_manager.add(params.text_document); } - #[jsonrpc_method("textDocument/didChange")] + #[jsonrpc_method("textDocument/didChange", kind = "notification")] pub fn did_change(&self, params: DidChangeTextDocumentParams) { for change in params.content_changes { let uri = params.text_document.uri.clone(); @@ -115,13 +118,13 @@ impl LatexLspServer { } } - #[jsonrpc_method("textDocument/didSave")] + #[jsonrpc_method("textDocument/didSave", kind = "notification")] pub fn did_save(&self, params: DidSaveTextDocumentParams) {} - #[jsonrpc_method("textDocument/didClose")] + #[jsonrpc_method("textDocument/didClose", kind = "notification")] pub fn did_close(&self, params: DidCloseTextDocumentParams) {} - #[jsonrpc_method("textDocument/completion")] + #[jsonrpc_method("textDocument/completion", kind = "request")] pub async fn completion(&self, params: CompletionParams) -> Result { let request = request!(self, params)?; let items = await!(CompletionProvider::execute(&request)); @@ -135,33 +138,33 @@ impl LatexLspServer { }) } - #[jsonrpc_method("completionItem/resolve")] + #[jsonrpc_method("completionItem/resolve", kind = "request")] pub async fn completion_resolve(&self, item: CompletionItem) -> Result { Ok(item) } - #[jsonrpc_method("textDocument/hover")] + #[jsonrpc_method("textDocument/hover", kind = "request")] pub async fn hover(&self, params: TextDocumentPositionParams) -> Result> { let request = request!(self, params)?; let hover = await!(HoverProvider::execute(&request)); Ok(hover) } - #[jsonrpc_method("textDocument/definition")] + #[jsonrpc_method("textDocument/definition", kind = "request")] pub async fn definition(&self, params: TextDocumentPositionParams) -> Result> { let request = request!(self, params)?; let results = await!(DefinitionProvider::execute(&request)); Ok(results) } - #[jsonrpc_method("textDocument/references")] + #[jsonrpc_method("textDocument/references", kind = "request")] pub async fn references(&self, params: ReferenceParams) -> Result> { let request = request!(self, params)?; let results = await!(ReferenceProvider::execute(&request)); Ok(results) } - #[jsonrpc_method("textDocument/documentHighlight")] + #[jsonrpc_method("textDocument/documentHighlight", kind = "request")] pub async fn document_highlight( &self, params: TextDocumentPositionParams, @@ -171,7 +174,7 @@ impl LatexLspServer { Ok(results) } - #[jsonrpc_method("textDocument/documentSymbol")] + #[jsonrpc_method("textDocument/documentSymbol", kind = "request")] pub async fn document_symbol( &self, params: DocumentSymbolParams, @@ -179,26 +182,26 @@ impl LatexLspServer { Ok(Vec::new()) } - #[jsonrpc_method("textDocument/documentLink")] + #[jsonrpc_method("textDocument/documentLink", kind = "request")] pub async fn document_link(&self, params: DocumentLinkParams) -> Result> { let request = request!(self, params)?; let links = await!(LinkProvider::execute(&request)); Ok(links) } - #[jsonrpc_method("textDocument/formatting")] + #[jsonrpc_method("textDocument/formatting", kind = "request")] pub async fn formatting(&self, params: DocumentFormattingParams) -> Result> { Ok(Vec::new()) } - #[jsonrpc_method("textDocument/rename")] + #[jsonrpc_method("textDocument/rename", kind = "request")] pub async fn rename(&self, params: RenameParams) -> Result> { let request = request!(self, params)?; let edit = await!(RenameProvider::execute(&request)); Ok(edit) } - #[jsonrpc_method("textDocument/foldingRange")] + #[jsonrpc_method("textDocument/foldingRange", kind = "request")] pub async fn folding_range(&self, params: FoldingRangeParams) -> Result> { let request = request!(self, params)?; let foldings = await!(FoldingProvider::execute(&request));