Skip to content

Commit

Permalink
Implement JEP 65 (#577)
Browse files Browse the repository at this point in the history
Closes #569

This PR fixes a race condition regarding subscriptions to IOPub that causes clients to miss IOPub messages:

- On startup a client connects to the server sockets of a kernel.
- The client sends a request on Shell.
- The kernel starts processing the request and emits busy on IOPub.

If the client hasn't been able to fully subscribe to IOPub, messages can be lost, in particular the Busy message that encloses the request output.

On the Positron side we fixed it by sending kernel-info requests in a loop until we get a Ready message on IOPub. This signals Positron that the kernel is fully connected and in the Ready state: posit-dev/positron#2207. We haven't implemented a similar fix in our dummy clients for integration tests and we believe this is what is causing the race condition described in #569.

As noted in posit-dev/positron#2207, there is an accepted JEP proposal (JEP 65) that aims at solving this problem by switching to XPUB.

https://jupyter.org/enhancement-proposals/65-jupyter-xpub/jupyter-xpub.html
jupyter/enhancement-proposals#65

The XPUB socket allows the server to get notified of all new subscriptions. A message of type `iopub_welcome` is sent to all connected clients. They should generally ignore it but clients that have just started up can use it as a cue that IOPub is correctly connected and that they won't miss any output from that point on.

Approach:

The subscription notification comes in as a message on the IOPub socket. This is problematic because the IOPub thread now needs to listens to its crossbeam channel and to the 0MQ socket at the same time, which isn't possible without resorting to timeout polling. So we use the same approach and infrastructure that we implemented in #58 for listeing to both input replies on the StdIn socket and interrupt notifications on a crossbeam channel. The forwarding thread now owns the IOPub socket and listens for subscription notifications and fowrards IOPub messages coming from the kernel components.

---

* Start moving IOPub messages to forwarding thread

* Remove unused import

* Resolve the roundabout `Message` problem

The solution was to move the conversion to `JupyterMessage<T>` up into the match, so we "know" what `T` is!

* Use correct `Welcome` `MessageType`

* Implement `SubscriptionMessage` support and switch to `XPUB`

* The `Welcome` message doesn't come from ark

* Use `amalthea::Result`

* Add more comments

---------

Co-authored-by: Davis Vaughan <[email protected]>
Co-authored-by: Lionel Henry <[email protected]>
  • Loading branch information
3 people committed Oct 10, 2024
1 parent 8194c2a commit f2fa635
Show file tree
Hide file tree
Showing 9 changed files with 466 additions and 160 deletions.
5 changes: 4 additions & 1 deletion crates/amalthea/src/comm/comm_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::comm::event::CommManagerRequest;
use crate::socket::comm::CommInitiator;
use crate::socket::comm::CommSocket;
use crate::socket::iopub::IOPubMessage;
use crate::wire::comm_close::CommClose;
use crate::wire::comm_msg::CommWireMsg;
use crate::wire::comm_open::CommOpen;
use crate::wire::header::JupyterHeader;
Expand Down Expand Up @@ -245,7 +246,9 @@ impl CommManager {
}
},

CommMsg::Close => IOPubMessage::CommClose(comm_socket.comm_id.clone()),
CommMsg::Close => IOPubMessage::CommClose(CommClose {
comm_id: comm_socket.comm_id.clone(),
}),
};

// Deliver the message to the frontend
Expand Down
27 changes: 14 additions & 13 deletions crates/amalthea/src/fixtures/dummy_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,6 @@ impl DummyFrontend {
)
.unwrap();

// Subscribe to IOPub! Server is the one that sent us this port,
// so its already connected on its end.
iopub_socket.subscribe().unwrap();

let stdin_socket = Socket::new(
connection.session.clone(),
connection.ctx.clone(),
Expand All @@ -186,14 +182,19 @@ impl DummyFrontend {
)
.unwrap();

// TODO!: Without this sleep, `IOPub` `Busy` messages sporadically
// don't arrive when running integration tests. I believe this is a result
// of PUB sockets dropping messages while in a "mute" state (i.e. no subscriber
// connected yet). Even though we run `iopub_socket.subscribe()` to subscribe,
// it seems like we can return from this function even before our socket
// has fully subscribed, causing messages to get dropped.
// https://libzmq.readthedocs.io/en/latest/zmq_socket.html
std::thread::sleep(std::time::Duration::from_millis(500));
// Subscribe to IOPub! Server's XPUB socket will receive a notification of
// our subscription with `subscription`, then will publish an IOPub `Welcome`
// message, sending back our `subscription`.
iopub_socket.subscribe(b"").unwrap();

// Immediately block until we've received the IOPub welcome message.
// This confirms that we've fully subscribed and avoids dropping any
// of the initial IOPub messages that a server may send if we start
// perform requests immediately.
// https://github.com/posit-dev/ark/pull/577
assert_matches!(Self::recv(&iopub_socket), Message::Welcome(data) => {
assert_eq!(data.content.subscription, String::from(""));
});

Self {
_control_socket,
Expand Down Expand Up @@ -347,7 +348,7 @@ impl DummyFrontend {
let msg = self.recv_iopub();

// Assert its type
let piece = assert_matches!(msg, Message::StreamOutput(data) => {
let piece = assert_matches!(msg, Message::Stream(data) => {
assert_eq!(data.content.name, stream);
data.content.text
});
Expand Down
90 changes: 74 additions & 16 deletions crates/amalthea/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::wire::jupyter_message::JupyterMessage;
use crate::wire::jupyter_message::Message;
use crate::wire::jupyter_message::OutboundMessage;
use crate::wire::jupyter_message::Status;
use crate::wire::subscription_message::SubscriptionMessage;

macro_rules! report_error {
($($arg:tt)+) => (if cfg!(debug_assertions) { panic!($($arg)+) } else { log::error!($($arg)+) })
Expand Down Expand Up @@ -118,20 +119,25 @@ pub fn connect(
)
});

// Create the IOPub PUB/SUB socket and start a thread to broadcast to
// Create the IOPub XPUB/SUB socket and start a thread to broadcast to
// the client. IOPub only broadcasts messages, so it listens to other
// threads on a Receiver<Message> instead of to the client.
let iopub_socket = Socket::new(
session.clone(),
ctx.clone(),
String::from("IOPub"),
zmq::PUB,
zmq::XPUB,
None,
connection_file.endpoint(connection_file.iopub_port),
)?;
let iopub_port = port_finalize(&iopub_socket, connection_file.iopub_port)?;

let (iopub_inbound_tx, iopub_inbound_rx) = unbounded();
let iopub_session = iopub_socket.session.clone();
let iopub_outbound_tx = outbound_tx.clone();

spawn!(format!("{name}-iopub"), move || {
iopub_thread(iopub_socket, iopub_rx)
iopub_thread(iopub_rx, iopub_inbound_rx, iopub_outbound_tx, iopub_session)
});

// Create the heartbeat socket and start a thread to listen for
Expand Down Expand Up @@ -165,11 +171,12 @@ pub fn connect(
let (stdin_inbound_tx, stdin_inbound_rx) = unbounded();
let (stdin_interrupt_tx, stdin_interrupt_rx) = bounded(1);
let stdin_session = stdin_socket.session.clone();
let stdin_outbound_tx = outbound_tx.clone();

spawn!(format!("{name}-stdin"), move || {
stdin_thread(
stdin_inbound_rx,
outbound_tx,
stdin_outbound_tx,
stdin_request_rx,
stdin_reply_tx,
stdin_interrupt_rx,
Expand Down Expand Up @@ -224,6 +231,8 @@ pub fn connect(
outbound_notif_socket_rx,
stdin_socket,
stdin_inbound_tx,
iopub_socket,
iopub_inbound_tx,
outbound_rx_clone,
)
});
Expand Down Expand Up @@ -338,8 +347,13 @@ fn shell_thread(
}

/// Starts the IOPub thread.
fn iopub_thread(socket: Socket, receiver: Receiver<IOPubMessage>) -> Result<(), Error> {
let mut iopub = IOPub::new(socket, receiver);
fn iopub_thread(
rx: Receiver<IOPubMessage>,
inbound_rx: Receiver<crate::Result<SubscriptionMessage>>,
outbound_tx: Sender<OutboundMessage>,
session: Session,
) -> Result<(), Error> {
let mut iopub = IOPub::new(rx, inbound_rx, outbound_tx, session);
iopub.listen();
Ok(())
}
Expand Down Expand Up @@ -367,10 +381,32 @@ fn stdin_thread(

/// Starts the thread that forwards 0MQ messages to Amalthea channels
/// and vice versa.
///
/// This is a solution to the problem of polling/selecting from 0MQ sockets and
/// crossbeam channels at the same time. Message events on crossbeam channels
/// are emitted by the notifier thread (see below) on a 0MQ socket. The
/// forwarding thread is then able to listen on 0MQ sockets (e.g. StdIn replies
/// and IOPub subscriptions) and the notification socket at the same time.
///
/// Part of the problem this setup solves is that 0MQ sockets can only be owned
/// by one thread at a time. Take IOPUb as an example: we need to listen on that
/// socket for subscription events. We also need to listen for new IOPub
/// messages to send to the client, sent via Crossbeam channels. So we need at
/// least two threads listening for these two different kinds of events. But the
/// forwarding thread has to fully own the socket to be able to listen to it. So
/// it's also in charge of sending IOPub messages on that socket. When an IOPub
/// message comes in, the notifier thread wakes up the forwarding thread which
/// then pulls messages from the channel and forwards them to the IOPub socket.
///
/// Terminology:
/// - Outbound means that a crossbeam message needs to be forwarded to a 0MQ socket.
/// - Inbound means that a 0MQ message needs to be forwarded to a crossbeam channel.
fn zmq_forwarding_thread(
outbound_notif_socket: Socket,
stdin_socket: Socket,
stdin_inbound_tx: Sender<crate::Result<Message>>,
iopub_socket: Socket,
iopub_inbound_tx: Sender<crate::Result<SubscriptionMessage>>,
outbound_rx: Receiver<OutboundMessage>,
) {
// This function checks for notifications that an outgoing message
Expand All @@ -394,8 +430,8 @@ fn zmq_forwarding_thread(
};

// This function checks that a 0MQ message from the frontend is ready.
let has_inbound = || -> bool {
match stdin_socket.socket.poll(zmq::POLLIN, 0) {
let has_inbound = |socket: &Socket| -> bool {
match socket.socket.poll(zmq::POLLIN, 0) {
Ok(n) if n > 0 => true,
_ => false,
}
Expand All @@ -409,6 +445,7 @@ fn zmq_forwarding_thread(
let outbound_msg = outbound_rx.recv()?;
match outbound_msg {
OutboundMessage::StdIn(msg) => msg.send(&stdin_socket)?,
OutboundMessage::IOPub(msg) => msg.send(&iopub_socket)?,
};

// Notify back
Expand All @@ -419,17 +456,28 @@ fn zmq_forwarding_thread(

// Forwards 0MQ message from the frontend to the corresponding
// Amalthea channel.
let forward_inbound = || -> anyhow::Result<()> {
let msg = Message::read_from_socket(&stdin_socket);
stdin_inbound_tx.send(msg)?;
let forward_inbound =
|socket: &Socket, inbound_tx: &Sender<crate::Result<Message>>| -> anyhow::Result<()> {
let msg = Message::read_from_socket(socket);
inbound_tx.send(msg)?;
Ok(())
};

// Forwards special 0MQ XPUB subscription message from the frontend to the IOPub thread.
let forward_inbound_subscription = |socket: &Socket,
inbound_tx: &Sender<crate::Result<SubscriptionMessage>>|
-> anyhow::Result<()> {
let msg = SubscriptionMessage::read_from_socket(socket);
inbound_tx.send(msg)?;
Ok(())
};

// Create poll items necessary to call `zmq_poll()`
let mut poll_items = {
let outbound_notif_poll_item = outbound_notif_socket.socket.as_poll_item(zmq::POLLIN);
let stdin_poll_item = stdin_socket.socket.as_poll_item(zmq::POLLIN);
vec![outbound_notif_poll_item, stdin_poll_item]
let iopub_poll_item = iopub_socket.socket.as_poll_item(zmq::POLLIN);
vec![outbound_notif_poll_item, stdin_poll_item, iopub_poll_item]
};

loop {
Expand All @@ -450,9 +498,17 @@ fn zmq_forwarding_thread(
continue;
}

if has_inbound() {
if has_inbound(&stdin_socket) {
unwrap!(
forward_inbound(&stdin_socket, &stdin_inbound_tx),
Err(err) => report_error!("While forwarding inbound message: {}", err)
);
continue;
}

if has_inbound(&iopub_socket) {
unwrap!(
forward_inbound(),
forward_inbound_subscription(&iopub_socket, &iopub_inbound_tx),
Err(err) => report_error!("While forwarding inbound message: {}", err)
);
continue;
Expand All @@ -463,8 +519,10 @@ fn zmq_forwarding_thread(
}
}

/// Starts the thread that notifies the forwarding thread that new
/// outgoing messages have arrived from Amalthea.
/// Starts the thread that notifies the forwarding thread that new outgoing
/// messages have arrived from Amalthea channels. This wakes up the forwarding
/// thread which will then pop the message from the channel and forward them to
/// the relevant zeromq socket.
fn zmq_notifier_thread(notif_socket: Socket, outbound_rx: Receiver<OutboundMessage>) {
let mut sel = Select::new();
sel.recv(&outbound_rx);
Expand Down
Loading

0 comments on commit f2fa635

Please sign in to comment.