Skip to content

Commit

Permalink
Send LSP server init notification to frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
lionel- committed Aug 21, 2023
1 parent eee0268 commit 360f002
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 23 deletions.
18 changes: 8 additions & 10 deletions crates/amalthea/src/comm/lsp_comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::sync::Mutex;
use crossbeam::channel::Sender;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;

use crate::comm::comm_channel::CommChannelMsg;
use crate::error::Error;
Expand All @@ -25,7 +24,7 @@ pub struct StartLsp {

pub struct LspComm {
handler: Arc<Mutex<dyn LspHandler>>,
msg_tx: Sender<CommChannelMsg>,
_msg_tx: Sender<CommChannelMsg>,
}

/**
Expand All @@ -37,17 +36,16 @@ pub struct LspComm {
*/
impl LspComm {
pub fn new(handler: Arc<Mutex<dyn LspHandler>>, msg_tx: Sender<CommChannelMsg>) -> LspComm {
LspComm { handler, msg_tx }
LspComm {
handler,
_msg_tx: msg_tx,
}
}

pub fn start(&self, data: &StartLsp) -> Result<(), Error> {
pub fn start(&self, data: &StartLsp, conn_init_tx: Sender<bool>) -> Result<(), Error> {
let mut handler = self.handler.lock().unwrap();
handler.start(data.client_address.clone()).unwrap();
self.msg_tx
.send(CommChannelMsg::Data(json!({
"msg_type": "lsp_started",
"content": {}
})))
handler
.start(data.client_address.clone(), conn_init_tx)
.unwrap();
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion crates/amalthea/src/language/lsp_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

use async_trait::async_trait;
use crossbeam::channel::Sender;

use crate::error::Error;

Expand All @@ -15,5 +16,5 @@ use crate::error::Error;
#[async_trait]
pub trait LspHandler: Send {
/// Starts the LSP server and binds it to the given TCP address.
fn start(&mut self, tcp_address: String) -> Result<(), Error>;
fn start(&mut self, tcp_address: String, conn_init_tx: Sender<bool>) -> Result<(), Error>;
}
41 changes: 32 additions & 9 deletions crates/amalthea/src/socket/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use futures::executor::block_on;
use log::debug;
use log::trace;
use log::warn;
use serde_json::json;
use stdext::result::ResultOrLog;

use crate::comm::comm_channel::Comm;
use crate::comm::comm_channel::CommChannelMsg;
Expand Down Expand Up @@ -404,7 +406,12 @@ impl Shell {
let comm_id = req.content.comm_id.clone();
let comm_name = req.content.target_name.clone();
let comm_data = req.content.data.clone();
let comm_socket = CommSocket::new(CommInitiator::FrontEnd, comm_id, comm_name.clone());
let comm_socket =
CommSocket::new(CommInitiator::FrontEnd, comm_id.clone(), comm_name.clone());

// Optional notification channel used by server comms to indicate
// they are ready to accept connections
let mut conn_init_rx: Option<Receiver<bool>> = None;

// Create a routine to send messages to the front end over the IOPub
// channel. This routine will be passed to the comm channel so it can
Expand All @@ -415,6 +422,9 @@ impl Shell {
// If this is the special LSP comm, start the LSP server and create
// a comm that wraps it
Comm::Lsp => {
let (init_tx, init_rx) = crossbeam::channel::bounded::<bool>(1);
conn_init_rx = Some(init_rx);

if let Some(handler) = self.lsp_handler.clone() {
// Parse the data parameter to a StartLsp message. This is a
// message from the front end that contains the information
Expand All @@ -432,7 +442,7 @@ impl Shell {
// Create the new comm wrapper channel for the LSP and start
// the LSP server in a separate thread
let lsp_comm = LspComm::new(handler, comm_socket.outgoing_tx.clone());
lsp_comm.start(&start_lsp)?;
lsp_comm.start(&start_lsp, init_tx)?;
true
} else {
// If we don't have an LSP handler, return an error
Expand Down Expand Up @@ -476,14 +486,27 @@ impl Shell {
if opened {
// Send a notification to the comm message listener thread that a new
// comm has been opened
if let Err(err) = self
.comm_manager_tx
self.comm_manager_tx
.send(CommEvent::Opened(comm_socket.clone(), comm_data))
{
warn!(
"Failed to send '{}' comm open notification to listener thread: {}",
comm_socket.comm_name, err
);
.or_log_warning(&format!(
"Failed to send '{}' comm open notification to listener thread",
comm_socket.comm_name
));

// If the comm wraps a server, send notification once the
// server is ready to accept connections
if let Some(rx) = conn_init_rx {
let _ = rx.recv();
comm_socket
.outgoing_tx
.send(CommChannelMsg::Data(json!({
"msg_type": "server_started",
"content": {}
})))
.or_log_warning(&format!(
"Failed to send '{}' comm init notification to frontend comm",
comm_socket.comm_name
));
}
} else {
// If the comm was not opened, return an error to the caller
Expand Down
13 changes: 12 additions & 1 deletion crates/ark/src/lsp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use log::*;
use parking_lot::Mutex;
use regex::Regex;
use serde_json::Value;
use stdext::result::ResultOrLog;
use stdext::*;
use tokio::net::TcpListener;
use tower_lsp::jsonrpc::Result;
Expand Down Expand Up @@ -574,14 +575,24 @@ impl Backend {
}

#[tokio::main]
pub async fn start_lsp(address: String, kernel_request_tx: Sender<KernelRequest>) {
pub async fn start_lsp(
address: String,
kernel_request_tx: Sender<KernelRequest>,
conn_init_tx: Sender<bool>,
) {
#[cfg(feature = "runtime-agnostic")]
use tokio_util::compat::TokioAsyncReadCompatExt;
#[cfg(feature = "runtime-agnostic")]
use tokio_util::compat::TokioAsyncWriteCompatExt;

debug!("Connecting to LSP at '{}'", &address);
let listener = TcpListener::bind(&address).await.unwrap();

// Notify frontend that we are ready to accept connections
conn_init_tx
.send(true)
.or_log_warning("Couldn't send LSP server init notification");

let (stream, _) = listener.accept().await.unwrap();
debug!("Connected to LSP at '{}'", address);
let (read, write) = tokio::io::split(stream);
Expand Down
9 changes: 7 additions & 2 deletions crates/ark/src/lsp/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ impl Lsp {
}

impl LspHandler for Lsp {
fn start(&mut self, tcp_address: String) -> Result<(), amalthea::error::Error> {
fn start(
&mut self,
tcp_address: String,
conn_init_tx: Sender<bool>,
) -> Result<(), amalthea::error::Error> {
// If the kernel hasn't been initialized yet, wait for it to finish.
// This prevents the LSP from attempting to start up before the kernel
// is ready; on subsequent starts (reconnects), the kernel will already
Expand All @@ -48,8 +52,9 @@ impl LspHandler for Lsp {
}

let kernel_request_tx = self.kernel_request_tx.clone();

spawn!("ark-lsp", move || {
backend::start_lsp(tcp_address, kernel_request_tx)
backend::start_lsp(tcp_address, kernel_request_tx, conn_init_tx)
});
return Ok(());
}
Expand Down

0 comments on commit 360f002

Please sign in to comment.