diff --git a/crates/amalthea/src/comm/lsp_comm.rs b/crates/amalthea/src/comm/lsp_comm.rs index ec5da20ea..6034c70cf 100644 --- a/crates/amalthea/src/comm/lsp_comm.rs +++ b/crates/amalthea/src/comm/lsp_comm.rs @@ -11,7 +11,6 @@ use std::sync::Mutex; use crossbeam::channel::Sender; use serde::Deserialize; use serde::Serialize; -use serde_json::json; use crate::comm::comm_channel::CommChannelMsg; use crate::error::Error; @@ -25,7 +24,7 @@ pub struct StartLsp { pub struct LspComm { handler: Arc>, - msg_tx: Sender, + _msg_tx: Sender, } /** @@ -37,17 +36,16 @@ pub struct LspComm { */ impl LspComm { pub fn new(handler: Arc>, msg_tx: Sender) -> LspComm { - LspComm { handler, msg_tx } + LspComm { + handler, + _msg_tx: msg_tx, + } } - pub fn start(&self, data: &StartLsp) -> Result<(), Error> { + pub fn start(&self, data: &StartLsp, conn_init_tx: Sender) -> Result<(), Error> { let mut handler = self.handler.lock().unwrap(); - handler.start(data.client_address.clone()).unwrap(); - self.msg_tx - .send(CommChannelMsg::Data(json!({ - "msg_type": "lsp_started", - "content": {} - }))) + handler + .start(data.client_address.clone(), conn_init_tx) .unwrap(); Ok(()) } diff --git a/crates/amalthea/src/language/lsp_handler.rs b/crates/amalthea/src/language/lsp_handler.rs index 55df0b564..91226ee31 100644 --- a/crates/amalthea/src/language/lsp_handler.rs +++ b/crates/amalthea/src/language/lsp_handler.rs @@ -6,6 +6,7 @@ */ use async_trait::async_trait; +use crossbeam::channel::Sender; use crate::error::Error; @@ -15,5 +16,5 @@ use crate::error::Error; #[async_trait] pub trait LspHandler: Send { /// Starts the LSP server and binds it to the given TCP address. - fn start(&mut self, tcp_address: String) -> Result<(), Error>; + fn start(&mut self, tcp_address: String, conn_init_tx: Sender) -> Result<(), Error>; } diff --git a/crates/amalthea/src/socket/shell.rs b/crates/amalthea/src/socket/shell.rs index eb6f483a2..06bf3a716 100644 --- a/crates/amalthea/src/socket/shell.rs +++ b/crates/amalthea/src/socket/shell.rs @@ -15,6 +15,8 @@ use futures::executor::block_on; use log::debug; use log::trace; use log::warn; +use serde_json::json; +use stdext::result::ResultOrLog; use crate::comm::comm_channel::Comm; use crate::comm::comm_channel::CommChannelMsg; @@ -404,7 +406,12 @@ impl Shell { let comm_id = req.content.comm_id.clone(); let comm_name = req.content.target_name.clone(); let comm_data = req.content.data.clone(); - let comm_socket = CommSocket::new(CommInitiator::FrontEnd, comm_id, comm_name.clone()); + let comm_socket = + CommSocket::new(CommInitiator::FrontEnd, comm_id.clone(), comm_name.clone()); + + // Optional notification channel used by server comms to indicate + // they are ready to accept connections + let mut conn_init_rx: Option> = None; // Create a routine to send messages to the front end over the IOPub // channel. This routine will be passed to the comm channel so it can @@ -415,6 +422,9 @@ impl Shell { // If this is the special LSP comm, start the LSP server and create // a comm that wraps it Comm::Lsp => { + let (init_tx, init_rx) = crossbeam::channel::bounded::(1); + conn_init_rx = Some(init_rx); + if let Some(handler) = self.lsp_handler.clone() { // Parse the data parameter to a StartLsp message. This is a // message from the front end that contains the information @@ -432,7 +442,7 @@ impl Shell { // Create the new comm wrapper channel for the LSP and start // the LSP server in a separate thread let lsp_comm = LspComm::new(handler, comm_socket.outgoing_tx.clone()); - lsp_comm.start(&start_lsp)?; + lsp_comm.start(&start_lsp, init_tx)?; true } else { // If we don't have an LSP handler, return an error @@ -476,14 +486,27 @@ impl Shell { if opened { // Send a notification to the comm message listener thread that a new // comm has been opened - if let Err(err) = self - .comm_manager_tx + self.comm_manager_tx .send(CommEvent::Opened(comm_socket.clone(), comm_data)) - { - warn!( - "Failed to send '{}' comm open notification to listener thread: {}", - comm_socket.comm_name, err - ); + .or_log_warning(&format!( + "Failed to send '{}' comm open notification to listener thread", + comm_socket.comm_name + )); + + // If the comm wraps a server, send notification once the + // server is ready to accept connections + if let Some(rx) = conn_init_rx { + let _ = rx.recv(); + comm_socket + .outgoing_tx + .send(CommChannelMsg::Data(json!({ + "msg_type": "server_started", + "content": {} + }))) + .or_log_warning(&format!( + "Failed to send '{}' comm init notification to frontend comm", + comm_socket.comm_name + )); } } else { // If the comm was not opened, return an error to the caller diff --git a/crates/ark/src/lsp/backend.rs b/crates/ark/src/lsp/backend.rs index 8b809cb68..67281ee0b 100644 --- a/crates/ark/src/lsp/backend.rs +++ b/crates/ark/src/lsp/backend.rs @@ -18,6 +18,7 @@ use log::*; use parking_lot::Mutex; use regex::Regex; use serde_json::Value; +use stdext::result::ResultOrLog; use stdext::*; use tokio::net::TcpListener; use tower_lsp::jsonrpc::Result; @@ -574,7 +575,11 @@ impl Backend { } #[tokio::main] -pub async fn start_lsp(address: String, kernel_request_tx: Sender) { +pub async fn start_lsp( + address: String, + kernel_request_tx: Sender, + conn_init_tx: Sender, +) { #[cfg(feature = "runtime-agnostic")] use tokio_util::compat::TokioAsyncReadCompatExt; #[cfg(feature = "runtime-agnostic")] @@ -582,6 +587,12 @@ pub async fn start_lsp(address: String, kernel_request_tx: Sender debug!("Connecting to LSP at '{}'", &address); let listener = TcpListener::bind(&address).await.unwrap(); + + // Notify frontend that we are ready to accept connections + conn_init_tx + .send(true) + .or_log_warning("Couldn't send LSP server init notification"); + let (stream, _) = listener.accept().await.unwrap(); debug!("Connected to LSP at '{}'", address); let (read, write) = tokio::io::split(stream); diff --git a/crates/ark/src/lsp/handler.rs b/crates/ark/src/lsp/handler.rs index 65b5e977c..a08846797 100644 --- a/crates/ark/src/lsp/handler.rs +++ b/crates/ark/src/lsp/handler.rs @@ -34,7 +34,11 @@ impl Lsp { } impl LspHandler for Lsp { - fn start(&mut self, tcp_address: String) -> Result<(), amalthea::error::Error> { + fn start( + &mut self, + tcp_address: String, + conn_init_tx: Sender, + ) -> Result<(), amalthea::error::Error> { // If the kernel hasn't been initialized yet, wait for it to finish. // This prevents the LSP from attempting to start up before the kernel // is ready; on subsequent starts (reconnects), the kernel will already @@ -48,8 +52,9 @@ impl LspHandler for Lsp { } let kernel_request_tx = self.kernel_request_tx.clone(); + spawn!("ark-lsp", move || { - backend::start_lsp(tcp_address, kernel_request_tx) + backend::start_lsp(tcp_address, kernel_request_tx, conn_init_tx) }); return Ok(()); }