-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement JEP 65 #577
Implement JEP 65 #577
Conversation
let forward_inbound = | ||
|socket: &Socket, inbound_tx: &Sender<crate::Result<Message>>| -> anyhow::Result<()> { | ||
let msg = Message::read_from_socket(socket); | ||
inbound_tx.send(msg)?; | ||
Ok(()) | ||
}; | ||
|
||
// Forwards special 0MQ XPUB subscription message from the frontend to the IOPub thread. | ||
let forward_inbound_subscription = |socket: &Socket, | ||
inbound_tx: &Sender<crate::Result<SubscriptionMessage>>| | ||
-> anyhow::Result<()> { | ||
let msg = SubscriptionMessage::read_from_socket(socket); | ||
inbound_tx.send(msg)?; | ||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a special inbound helper here, since this is a special SubscriptionMessage
not a typical Message
. It has its own format that it reads off the wire
/// Parse a SubscriptionMessage from an array of buffers (from a ZeroMQ message) | ||
/// | ||
/// Always a single frame (i.e. `bufs` should be length 1). | ||
/// Either `1{subscription}` for subscription. | ||
/// Or `0{subscription}` for unsubscription. | ||
fn from_buffers(bufs: Vec<Vec<u8>>) -> Result<SubscriptionMessage, Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was fun
// https://github.com/jupyter/jupyter_kernel_test/blob/5f2c65271b48dc95fc75a9585cb1d6db0bb55557/jupyter_kernel_test/__init__.py#L449-L450 | ||
impl MessageType for Welcome { | ||
fn message_type() -> String { | ||
String::from("iopub_welcome") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"iopub_welcome"
is the name recommended by the JEP
Co-authored-by: Lionel Henry [email protected] Co-authored-by: Davis Vaughan [email protected]
The solution was to move the conversion to `JupyterMessage<T>` up into the match, so we "know" what `T` is!
bcc724a
to
77faa7b
Compare
Closes #569
This PR fixes a race condition regarding subscriptions to IOPub that causes clients to miss IOPub messages:
If the client hasn't been able to fully subscribe to IOPub, messages can be lost, in particular the Busy message that encloses the request output.
On the Positron side we fixed it by sending kernel-info requests in a loop until we get a Ready message on IOPub. This signals Positron that the kernel is fully connected and in the Ready state: posit-dev/positron#2207. We haven't implemented a similar fix in our dummy clients for integration tests and we believe this is what is causing the race condition described in #569.
As noted in posit-dev/positron#2207, there is an accepted JEP proposal (JEP 65) that aims at solving this problem by switching to XPUB.
https://jupyter.org/enhancement-proposals/65-jupyter-xpub/jupyter-xpub.html
jupyter/enhancement-proposals#65
The XPUB socket allows the server to get notified of all new subscriptions. A message of type
iopub_welcome
is sent to all connected clients. They should generally ignore it but clients that have just started up can use it as a cue that IOPub is correctly connected and that they won't miss any output from that point on.Approach:
The subscription notification comes in as a message on the IOPub socket. This is problematic because the IOPub thread now needs to listens to its crossbeam channel and to the 0MQ socket at the same time, which isn't possible without resorting to timeout polling. So we use the same approach and infrastructure that we implemented in #58 for listeing to both input replies on the StdIn socket and interrupt notifications on a crossbeam channel. The forwarding thread now owns the IOPub socket and listens for subscription notifications and fowrards IOPub messages coming from the kernel components.
Co-authored-by: Davis Vaughan [email protected]
Co-authored-by: Lionel Henry [email protected]