Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement JEP 65 #577

Merged
merged 8 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/amalthea/src/comm/comm_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::comm::event::CommManagerRequest;
use crate::socket::comm::CommInitiator;
use crate::socket::comm::CommSocket;
use crate::socket::iopub::IOPubMessage;
use crate::wire::comm_close::CommClose;
use crate::wire::comm_msg::CommWireMsg;
use crate::wire::comm_open::CommOpen;
use crate::wire::header::JupyterHeader;
Expand Down Expand Up @@ -245,7 +246,9 @@ impl CommManager {
}
},

CommMsg::Close => IOPubMessage::CommClose(comm_socket.comm_id.clone()),
CommMsg::Close => IOPubMessage::CommClose(CommClose {
comm_id: comm_socket.comm_id.clone(),
}),
};

// Deliver the message to the frontend
Expand Down
27 changes: 14 additions & 13 deletions crates/amalthea/src/fixtures/dummy_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,6 @@ impl DummyFrontend {
)
.unwrap();

// Subscribe to IOPub! Server is the one that sent us this port,
// so its already connected on its end.
iopub_socket.subscribe().unwrap();

let stdin_socket = Socket::new(
connection.session.clone(),
connection.ctx.clone(),
Expand All @@ -186,14 +182,19 @@ impl DummyFrontend {
)
.unwrap();

// TODO!: Without this sleep, `IOPub` `Busy` messages sporadically
// don't arrive when running integration tests. I believe this is a result
// of PUB sockets dropping messages while in a "mute" state (i.e. no subscriber
// connected yet). Even though we run `iopub_socket.subscribe()` to subscribe,
// it seems like we can return from this function even before our socket
// has fully subscribed, causing messages to get dropped.
// https://libzmq.readthedocs.io/en/latest/zmq_socket.html
std::thread::sleep(std::time::Duration::from_millis(500));
// Subscribe to IOPub! Server's XPUB socket will receive a notification of
// our subscription with `subscription`, then will publish an IOPub `Welcome`
// message, sending back our `subscription`.
iopub_socket.subscribe(b"").unwrap();

// Immediately block until we've received the IOPub welcome message.
// This confirms that we've fully subscribed and avoids dropping any
// of the initial IOPub messages that a server may send if we start
// perform requests immediately.
// https://github.com/posit-dev/ark/pull/577
assert_matches!(Self::recv(&iopub_socket), Message::Welcome(data) => {
assert_eq!(data.content.subscription, String::from(""));
});

Self {
_control_socket,
Expand Down Expand Up @@ -347,7 +348,7 @@ impl DummyFrontend {
let msg = self.recv_iopub();

// Assert its type
let piece = assert_matches!(msg, Message::StreamOutput(data) => {
let piece = assert_matches!(msg, Message::Stream(data) => {
assert_eq!(data.content.name, stream);
data.content.text
});
Expand Down
90 changes: 74 additions & 16 deletions crates/amalthea/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::wire::jupyter_message::JupyterMessage;
use crate::wire::jupyter_message::Message;
use crate::wire::jupyter_message::OutboundMessage;
use crate::wire::jupyter_message::Status;
use crate::wire::subscription_message::SubscriptionMessage;

macro_rules! report_error {
($($arg:tt)+) => (if cfg!(debug_assertions) { panic!($($arg)+) } else { log::error!($($arg)+) })
Expand Down Expand Up @@ -118,20 +119,25 @@ pub fn connect(
)
});

// Create the IOPub PUB/SUB socket and start a thread to broadcast to
// Create the IOPub XPUB/SUB socket and start a thread to broadcast to
// the client. IOPub only broadcasts messages, so it listens to other
// threads on a Receiver<Message> instead of to the client.
let iopub_socket = Socket::new(
session.clone(),
ctx.clone(),
String::from("IOPub"),
zmq::PUB,
zmq::XPUB,
None,
connection_file.endpoint(connection_file.iopub_port),
)?;
let iopub_port = port_finalize(&iopub_socket, connection_file.iopub_port)?;

let (iopub_inbound_tx, iopub_inbound_rx) = unbounded();
let iopub_session = iopub_socket.session.clone();
let iopub_outbound_tx = outbound_tx.clone();

spawn!(format!("{name}-iopub"), move || {
iopub_thread(iopub_socket, iopub_rx)
iopub_thread(iopub_rx, iopub_inbound_rx, iopub_outbound_tx, iopub_session)
});

// Create the heartbeat socket and start a thread to listen for
Expand Down Expand Up @@ -165,11 +171,12 @@ pub fn connect(
let (stdin_inbound_tx, stdin_inbound_rx) = unbounded();
let (stdin_interrupt_tx, stdin_interrupt_rx) = bounded(1);
let stdin_session = stdin_socket.session.clone();
let stdin_outbound_tx = outbound_tx.clone();

spawn!(format!("{name}-stdin"), move || {
stdin_thread(
stdin_inbound_rx,
outbound_tx,
stdin_outbound_tx,
stdin_request_rx,
stdin_reply_tx,
stdin_interrupt_rx,
Expand Down Expand Up @@ -224,6 +231,8 @@ pub fn connect(
outbound_notif_socket_rx,
stdin_socket,
stdin_inbound_tx,
iopub_socket,
iopub_inbound_tx,
outbound_rx_clone,
)
});
Expand Down Expand Up @@ -338,8 +347,13 @@ fn shell_thread(
}

/// Starts the IOPub thread.
fn iopub_thread(socket: Socket, receiver: Receiver<IOPubMessage>) -> Result<(), Error> {
let mut iopub = IOPub::new(socket, receiver);
fn iopub_thread(
rx: Receiver<IOPubMessage>,
inbound_rx: Receiver<crate::Result<SubscriptionMessage>>,
outbound_tx: Sender<OutboundMessage>,
session: Session,
) -> Result<(), Error> {
let mut iopub = IOPub::new(rx, inbound_rx, outbound_tx, session);
iopub.listen();
Ok(())
}
Expand Down Expand Up @@ -367,10 +381,32 @@ fn stdin_thread(

/// Starts the thread that forwards 0MQ messages to Amalthea channels
/// and vice versa.
///
/// This is a solution to the problem of polling/selecting from 0MQ sockets and
/// crossbeam channels at the same time. Message events on crossbeam channels
/// are emitted by the notifier thread (see below) on a 0MQ socket. The
/// forwarding thread is then able to listen on 0MQ sockets (e.g. StdIn replies
/// and IOPub subscriptions) and the notification socket at the same time.
///
/// Part of the problem this setup solves is that 0MQ sockets can only be owned
/// by one thread at a time. Take IOPUb as an example: we need to listen on that
/// socket for subscription events. We also need to listen for new IOPub
/// messages to send to the client, sent via Crossbeam channels. So we need at
/// least two threads listening for these two different kinds of events. But the
/// forwarding thread has to fully own the socket to be able to listen to it. So
/// it's also in charge of sending IOPub messages on that socket. When an IOPub
/// message comes in, the notifier thread wakes up the forwarding thread which
/// then pulls messages from the channel and forwards them to the IOPub socket.
///
/// Terminology:
/// - Outbound means that a crossbeam message needs to be forwarded to a 0MQ socket.
/// - Inbound means that a 0MQ message needs to be forwarded to a crossbeam channel.
fn zmq_forwarding_thread(
outbound_notif_socket: Socket,
stdin_socket: Socket,
stdin_inbound_tx: Sender<crate::Result<Message>>,
iopub_socket: Socket,
iopub_inbound_tx: Sender<crate::Result<SubscriptionMessage>>,
outbound_rx: Receiver<OutboundMessage>,
) {
// This function checks for notifications that an outgoing message
Expand All @@ -394,8 +430,8 @@ fn zmq_forwarding_thread(
};

// This function checks that a 0MQ message from the frontend is ready.
let has_inbound = || -> bool {
match stdin_socket.socket.poll(zmq::POLLIN, 0) {
let has_inbound = |socket: &Socket| -> bool {
match socket.socket.poll(zmq::POLLIN, 0) {
Ok(n) if n > 0 => true,
_ => false,
}
Expand All @@ -409,6 +445,7 @@ fn zmq_forwarding_thread(
let outbound_msg = outbound_rx.recv()?;
match outbound_msg {
OutboundMessage::StdIn(msg) => msg.send(&stdin_socket)?,
OutboundMessage::IOPub(msg) => msg.send(&iopub_socket)?,
};

// Notify back
Expand All @@ -419,17 +456,28 @@ fn zmq_forwarding_thread(

// Forwards 0MQ message from the frontend to the corresponding
// Amalthea channel.
let forward_inbound = || -> anyhow::Result<()> {
let msg = Message::read_from_socket(&stdin_socket);
stdin_inbound_tx.send(msg)?;
let forward_inbound =
|socket: &Socket, inbound_tx: &Sender<crate::Result<Message>>| -> anyhow::Result<()> {
let msg = Message::read_from_socket(socket);
inbound_tx.send(msg)?;
Ok(())
};

// Forwards special 0MQ XPUB subscription message from the frontend to the IOPub thread.
let forward_inbound_subscription = |socket: &Socket,
inbound_tx: &Sender<crate::Result<SubscriptionMessage>>|
-> anyhow::Result<()> {
let msg = SubscriptionMessage::read_from_socket(socket);
inbound_tx.send(msg)?;
Ok(())
Comment on lines +459 to 472
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a special inbound helper here, since this is a special SubscriptionMessage not a typical Message. It has its own format that it reads off the wire

};

// Create poll items necessary to call `zmq_poll()`
let mut poll_items = {
let outbound_notif_poll_item = outbound_notif_socket.socket.as_poll_item(zmq::POLLIN);
let stdin_poll_item = stdin_socket.socket.as_poll_item(zmq::POLLIN);
vec![outbound_notif_poll_item, stdin_poll_item]
let iopub_poll_item = iopub_socket.socket.as_poll_item(zmq::POLLIN);
vec![outbound_notif_poll_item, stdin_poll_item, iopub_poll_item]
};

loop {
Expand All @@ -450,9 +498,17 @@ fn zmq_forwarding_thread(
continue;
}

if has_inbound() {
if has_inbound(&stdin_socket) {
unwrap!(
forward_inbound(&stdin_socket, &stdin_inbound_tx),
Err(err) => report_error!("While forwarding inbound message: {}", err)
);
continue;
}

if has_inbound(&iopub_socket) {
unwrap!(
forward_inbound(),
forward_inbound_subscription(&iopub_socket, &iopub_inbound_tx),
Err(err) => report_error!("While forwarding inbound message: {}", err)
);
continue;
Expand All @@ -463,8 +519,10 @@ fn zmq_forwarding_thread(
}
}

/// Starts the thread that notifies the forwarding thread that new
/// outgoing messages have arrived from Amalthea.
/// Starts the thread that notifies the forwarding thread that new outgoing
/// messages have arrived from Amalthea channels. This wakes up the forwarding
/// thread which will then pop the message from the channel and forward them to
/// the relevant zeromq socket.
fn zmq_notifier_thread(notif_socket: Socket, outbound_rx: Receiver<OutboundMessage>) {
let mut sel = Select::new();
sel.recv(&outbound_rx);
Expand Down
Loading
Loading