You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I've been using qapi for communication with QEMU. What I have done is:
let stream = qapi::futures::QmpStreamTokio::open_uds(&socket_addr).await?;
let stream = stream.negotiate().await?;
let (service, events) = stream.into_parts();
To get a QapiEvents<S> instance.
Then I used it with tokio::select:
let qemu_exit_status: Result<ExitStatus> = loop {
tokio::select! {
// Wait for the QEMU process to exit
result = self.child.wait() => {
info!("{MONITOR_MSG_HEADER}QEMU process terminated");
match result {
Ok(exit_status) => {
// QEMU process has exited, sync one last time and return
break Ok(exit_status)
}
Err(e) => {
// Error waiting for QEMU process, return the error.
break Err(e.into())
}
}
}
// Monitor events sent by QEMU
Some(event) = self.qmp.events.next() => {
let event = match event {
Ok(event) => { event }
Err(e) => {
error!("{MONITOR_MSG_HEADER}{e}");
errs.push(e.into());
continue;
}
};
//TODO: handle events
info!("{MONITOR_MSG_HEADER}got event {:?}", event);
}
_ = interval.tick() => {
heartbeat_sent += 1;
if heartbeat_sent == 10 {
info!("{MONITOR_MSG_HEADER}Sent 10 heartbeat");
heartbeat_sent = 0;
}
}
}
};
Then the tokio::select! failed to work (I cannot see Sent to heartbeat in log file, which means the whole tokio::select! failed).
Then I searched in the source file and found:
// In `qapi::futures`
#[cfg(feature = "qapi-qmp")]
impl<S: Stream<Item=io::Result<QmpMessageAny>>> Stream for QapiEvents<S> {
type Item = io::Result<qapi_qmp::Event>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = unsafe { self.get_unchecked_mut() };
let stream = unsafe { Pin::new_unchecked(&mut this.stream) };
let shared = &this.shared;
shared.poll_next(cx, |cx| Poll::Ready(match futures::ready!(stream.poll_next(cx)) {
None => None, // eof
Some(Err(e)) => Some(Err(e)),
Some(Ok(QmpMessage::Event(e))) => Some(Ok(e)),
Some(Ok(QmpMessage::Response(res))) => match handle_response(shared, res) {
Err(e) => Some(Err(e)),
Ok(()) => {
cx.waker().wake_by_ref(); // TODO: I've seen this not work with tokio?
return Poll::Pending
},
},
}))
}
}
TODO: I've seen this not work with tokio?
Yeah now I've seen this as well!
Do you have any plan to work on this bug? @arcnmx
The text was updated successfully, but these errors were encountered:
I've been using
qapi
for communication with QEMU. What I have done is:To get a
QapiEvents<S>
instance.Then I used it with
tokio::select
:Then the
tokio::select!
failed to work (I cannot seeSent to heartbeat
in log file, which means the wholetokio::select!
failed).Then I searched in the source file and found:
TODO: I've seen this not work with tokio?
Yeah now I've seen this as well!
Do you have any plan to work on this bug? @arcnmx
The text was updated successfully, but these errors were encountered: