Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send LSP server init notification to frontend #71

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions crates/amalthea/src/comm/lsp_comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::sync::Mutex;
use crossbeam::channel::Sender;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;

use crate::comm::comm_channel::CommChannelMsg;
use crate::error::Error;
Expand All @@ -25,7 +24,7 @@ pub struct StartLsp {

pub struct LspComm {
handler: Arc<Mutex<dyn LspHandler>>,
msg_tx: Sender<CommChannelMsg>,
_msg_tx: Sender<CommChannelMsg>,
}

/**
Expand All @@ -37,17 +36,16 @@ pub struct LspComm {
*/
impl LspComm {
pub fn new(handler: Arc<Mutex<dyn LspHandler>>, msg_tx: Sender<CommChannelMsg>) -> LspComm {
LspComm { handler, msg_tx }
LspComm {
handler,
_msg_tx: msg_tx,
}
}

pub fn start(&self, data: &StartLsp) -> Result<(), Error> {
pub fn start(&self, data: &StartLsp, conn_init_tx: Sender<bool>) -> Result<(), Error> {
let mut handler = self.handler.lock().unwrap();
handler.start(data.client_address.clone()).unwrap();
self.msg_tx
.send(CommChannelMsg::Data(json!({
"msg_type": "lsp_started",
"content": {}
})))
handler
.start(data.client_address.clone(), conn_init_tx)
.unwrap();
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion crates/amalthea/src/language/lsp_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

use async_trait::async_trait;
use crossbeam::channel::Sender;

use crate::error::Error;

Expand All @@ -15,5 +16,5 @@ use crate::error::Error;
#[async_trait]
pub trait LspHandler: Send {
/// Starts the LSP server and binds it to the given TCP address.
fn start(&mut self, tcp_address: String) -> Result<(), Error>;
fn start(&mut self, tcp_address: String, conn_init_tx: Sender<bool>) -> Result<(), Error>;
}
41 changes: 32 additions & 9 deletions crates/amalthea/src/socket/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use futures::executor::block_on;
use log::debug;
use log::trace;
use log::warn;
use serde_json::json;
use stdext::result::ResultOrLog;

use crate::comm::comm_channel::Comm;
use crate::comm::comm_channel::CommChannelMsg;
Expand Down Expand Up @@ -404,7 +406,12 @@ impl Shell {
let comm_id = req.content.comm_id.clone();
let comm_name = req.content.target_name.clone();
let comm_data = req.content.data.clone();
let comm_socket = CommSocket::new(CommInitiator::FrontEnd, comm_id, comm_name.clone());
let comm_socket =
CommSocket::new(CommInitiator::FrontEnd, comm_id.clone(), comm_name.clone());

// Optional notification channel used by server comms to indicate
// they are ready to accept connections
let mut conn_init_rx: Option<Receiver<bool>> = None;

// Create a routine to send messages to the front end over the IOPub
// channel. This routine will be passed to the comm channel so it can
Expand All @@ -415,6 +422,9 @@ impl Shell {
// If this is the special LSP comm, start the LSP server and create
// a comm that wraps it
Comm::Lsp => {
let (init_tx, init_rx) = crossbeam::channel::bounded::<bool>(1);
conn_init_rx = Some(init_rx);

if let Some(handler) = self.lsp_handler.clone() {
// Parse the data parameter to a StartLsp message. This is a
// message from the front end that contains the information
Expand All @@ -432,7 +442,7 @@ impl Shell {
// Create the new comm wrapper channel for the LSP and start
// the LSP server in a separate thread
let lsp_comm = LspComm::new(handler, comm_socket.outgoing_tx.clone());
lsp_comm.start(&start_lsp)?;
lsp_comm.start(&start_lsp, init_tx)?;
true
} else {
// If we don't have an LSP handler, return an error
Expand Down Expand Up @@ -476,14 +486,27 @@ impl Shell {
if opened {
// Send a notification to the comm message listener thread that a new
// comm has been opened
if let Err(err) = self
.comm_manager_tx
self.comm_manager_tx
.send(CommEvent::Opened(comm_socket.clone(), comm_data))
{
warn!(
"Failed to send '{}' comm open notification to listener thread: {}",
comm_socket.comm_name, err
);
.or_log_warning(&format!(
"Failed to send '{}' comm open notification to listener thread",
comm_socket.comm_name
));

// If the comm wraps a server, send notification once the
// server is ready to accept connections
if let Some(rx) = conn_init_rx {
let _ = rx.recv();
comm_socket
.outgoing_tx
.send(CommChannelMsg::Data(json!({
"msg_type": "server_started",
"content": {}
})))
.or_log_warning(&format!(
"Failed to send '{}' comm init notification to frontend comm",
comm_socket.comm_name
));
}
} else {
// If the comm was not opened, return an error to the caller
Expand Down
13 changes: 12 additions & 1 deletion crates/ark/src/lsp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use log::*;
use parking_lot::Mutex;
use regex::Regex;
use serde_json::Value;
use stdext::result::ResultOrLog;
use stdext::*;
use tokio::net::TcpListener;
use tower_lsp::jsonrpc::Result;
Expand Down Expand Up @@ -574,14 +575,24 @@ impl Backend {
}

#[tokio::main]
pub async fn start_lsp(address: String, kernel_request_tx: Sender<KernelRequest>) {
pub async fn start_lsp(
address: String,
kernel_request_tx: Sender<KernelRequest>,
conn_init_tx: Sender<bool>,
) {
#[cfg(feature = "runtime-agnostic")]
use tokio_util::compat::TokioAsyncReadCompatExt;
#[cfg(feature = "runtime-agnostic")]
use tokio_util::compat::TokioAsyncWriteCompatExt;

debug!("Connecting to LSP at '{}'", &address);
let listener = TcpListener::bind(&address).await.unwrap();

// Notify frontend that we are ready to accept connections
conn_init_tx
.send(true)
.or_log_warning("Couldn't send LSP server init notification");

let (stream, _) = listener.accept().await.unwrap();
debug!("Connected to LSP at '{}'", address);
let (read, write) = tokio::io::split(stream);
Expand Down
9 changes: 7 additions & 2 deletions crates/ark/src/lsp/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ impl Lsp {
}

impl LspHandler for Lsp {
fn start(&mut self, tcp_address: String) -> Result<(), amalthea::error::Error> {
fn start(
&mut self,
tcp_address: String,
conn_init_tx: Sender<bool>,
) -> Result<(), amalthea::error::Error> {
// If the kernel hasn't been initialized yet, wait for it to finish.
// This prevents the LSP from attempting to start up before the kernel
// is ready; on subsequent starts (reconnects), the kernel will already
Expand All @@ -48,8 +52,9 @@ impl LspHandler for Lsp {
}

let kernel_request_tx = self.kernel_request_tx.clone();

spawn!("ark-lsp", move || {
backend::start_lsp(tcp_address, kernel_request_tx)
backend::start_lsp(tcp_address, kernel_request_tx, conn_init_tx)
});
return Ok(());
}
Expand Down