Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for messages in the error queue #1672

Closed
folkertdev opened this issue May 18, 2023 · 6 comments
Closed

Wait for messages in the error queue #1672

folkertdev opened this issue May 18, 2023 · 6 comments

Comments

@folkertdev
Copy link

I work on ntpd-rs, an NTP implementation in rust. A big part of how it works is to configure a UDP socket to record timestamps when a message is sent or received, and then retrieving these timestamps. Based on the documentation, the timestamp is not available immediately (we've observed this for external hardware clocks, software timestamps do seem to be effectively instant).

These timestaps end up in the error queue (libc::MSG_ERRQUEUE) of the socket. Tokio does not have a way to await or read these messages currently. Our fix for the time being is to configure an extra file descriptor that becomes readable when the UDP socket encounters the EPOLLERR event.

#[cfg(target_os = "linux")]
pub(crate) mod err_queue_waiter {

    use std::os::unix::prelude::{AsRawFd, RawFd};

    use tokio::io::{unix::AsyncFd, Interest};

    use crate::raw_socket::cerr;

    pub struct ErrQueueWaiter {
        epoll_fd: AsyncFd<RawFd>,
    }

    fn create_error(inner: std::io::Error) -> std::io::Error {
        std::io::Error::new(
            std::io::ErrorKind::Other,
            format!("could not create error queue waiter epoll socket: {inner:?}"),
        )
    }

    impl ErrQueueWaiter {
        pub fn new(source: &impl AsRawFd) -> std::io::Result<Self> {
            // Safety: safe to call with
            let epoll = cerr(unsafe { libc::epoll_create(1) }).map_err(create_error)?;

            let mut ev = libc::epoll_event {
                events: libc::EPOLLERR as _,
                u64: 0,
            };

            cerr(unsafe {
                libc::epoll_ctl(
                    epoll,
                    libc::EPOLL_CTL_ADD,
                    source.as_raw_fd(),
                    &mut ev as *mut _,
                )
            })
            .map_err(create_error)?;

            Ok(Self {
                epoll_fd: AsyncFd::new(epoll)?,
            })
        }

        pub async fn wait(&self) -> std::io::Result<()> {
            self.epoll_fd
                .async_io(Interest::READABLE, |fd| {
                    let mut ev = libc::epoll_event { events: 0, u64: 0 };

                    match unsafe { libc::epoll_wait(*fd, &mut ev as *mut _, 1, 0) } {
                        0 => Err(std::io::ErrorKind::WouldBlock.into()),
                        _ => Ok(()),
                    }
                })
                .await
        }
    }
}

full code here

We'd like to add primitives to mio and tokio to eliminate this unsafe code from our repository. A first step would be to be able to await messages in the error queue, which I think requires exposing the EPOLLERR event.

The #1647 PR adds Interest::PRIORITY . I'd like to also make Interest::ERROR available in much the same way.

Does that seem reasonable? I'd be happy to take on the actual implementation work.

@Thomasdezeeuw
Copy link
Collaborator

Per the epoll_ctl(2) docs:

 EPOLLERR
         Error condition happened on the associated file
         descriptor.  This event is also reported for the write end
         of a pipe when the read end has been closed.

         epoll_wait(2) will always report for this event; it is not
         necessary to set it in events when calling epoll_ctl().

So Interest::ERROR wouldn't be required as it's always reported. I believe the same is true for other platforms.

However, I don't think you need separate file descriptor for this. Looking through ip(7) and udp(7), you can set IP_RECVERR on the UDP socket and then call recvmsg(2) with MSG_ERRQUEUE. I'm not exactly sure what Mio event would be triggered, I imagine readable and error, but you'll have to test that for yourself.

@folkertdev
Copy link
Author

you're right that the second socket is not technically needed, but I think it's the only current way to have this mechanism work with tokio. Unless I'm mistaken tokio would need an Interest::ERROR in order to await this event in a targeted way.

retrieving the content of the error message works as you say, and is not a problem at all. it's really that we must wait for the socket to report that the time stamp is available.

these are the relevant events as captured by strace

// Poll::new()
epoll_create1(EPOLL_CLOEXEC)            = 3
// UdpSocket::bind 
socket(AF_INET, SOCK_DGRAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_IP) = 4
bind(4, {sa_family=AF_INET, sin_port=htons(8012), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
// UdpSocket::connect
connect(4, {sa_family=AF_INET, sin_port=htons(8013), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
// setsockopt configuring timestamping
setsockopt(4, SOL_SOCKET, SO_TIMESTAMPING_OLD, [2178], 4) = 0
// poll.registry().register
epoll_ctl(3, EPOLL_CTL_ADD, 4, {EPOLLPRI|EPOLLET, {u32=0, u64=0}}) = 0
// socket.send
sendto(4, "abcde", 5, MSG_NOSIGNAL, NULL, 0) = 5
// poll.poll 
epoll_wait(3, [{EPOLLERR, {u32=0, u64=0}}], 1, -1) = 1

So just to confirm: to your knowledge, mio today provides everything required for tokio to support awaiting on messages arriving into the error queue?

@Thomasdezeeuw
Copy link
Collaborator

you're right that the second socket is not technically needed, but I think it's the only current way to have this mechanism work with tokio. Unless I'm mistaken tokio would need an Interest::ERROR in order to await this event in a targeted way.

I'm not about Tokio, but for Mio the "error" interest is always registered (by the OS) and can be retrieved using Event::is_error. I don't know how Tokio should expose this (as Interest::ERROR doesn't really make a lot of sense if you can't actually register for error events, or rather not-register for them). @Darksonn any thoughts on this (should perhaps be moved to the Tokio issue tracker).

retrieving the content of the error message works as you say, and is not a problem at all. it's really that we must wait for the socket to report that the time stamp is available.

these are the relevant events as captured by strace

// Poll::new()
epoll_create1(EPOLL_CLOEXEC)            = 3
// UdpSocket::bind 
socket(AF_INET, SOCK_DGRAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_IP) = 4
bind(4, {sa_family=AF_INET, sin_port=htons(8012), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
// UdpSocket::connect
connect(4, {sa_family=AF_INET, sin_port=htons(8013), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
// setsockopt configuring timestamping
setsockopt(4, SOL_SOCKET, SO_TIMESTAMPING_OLD, [2178], 4) = 0
// poll.registry().register
epoll_ctl(3, EPOLL_CTL_ADD, 4, {EPOLLPRI|EPOLLET, {u32=0, u64=0}}) = 0
// socket.send
sendto(4, "abcde", 5, MSG_NOSIGNAL, NULL, 0) = 5
// poll.poll 
epoll_wait(3, [{EPOLLERR, {u32=0, u64=0}}], 1, -1) = 1

So just to confirm: to your knowledge, mio today provides everything required for tokio to support awaiting on messages arriving into the error queue?

I think so. The EPOLLERR event returned by Poll::poll/epoll_wait should be exposed by Tokio in some way.

@folkertdev
Copy link
Author

I did some more digging. I think tokio-rs/tokio#5566 if/when it lands provides a bunch of useful features. In particular it allows this pattern on file descriptors

let guard = async_fd.ready(Interest::PRIORITY).await.unwrap();
dbg!(guard.ready());

with some hackery, I can indeed get an error event to show

[src/main.rs:90] guard.ready() = Ready {
    is_readable: false,
    is_writable: false,
    is_read_closed: false,
    is_write_closed: false,
    is_priority_ready: false,
    is_error: true,
}

However, there isn't really a good way to await error readiness that I can see.

In tokio, to await something you need to provide an interest. Today this is defined as

#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Interest(mio::Interest);

this interest is then turned into a ready value, and this ready value is matched agains incoming events, to see if anything matches.

pub(crate) fn from_interest(interest: Interest) -> Ready {
    let mut ready = Ready::EMPTY;

    if interest.is_readable() {
        ready |= Ready::READABLE;
        ready |= Ready::READ_CLOSED;
    }

    if interest.is_writable() {
        ready |= Ready::WRITABLE;
        ready |= Ready::WRITE_CLOSED;
    }

    #[cfg(any(target_os = "linux", target_os = "android"))]
    if interest.is_priority() {
        ready |= Ready::PRIORITY;
        ready |= Ready::READ_CLOSED;
    }

    // NOTE: this is a line I added
    ready |= Ready::ERROR;

    ready
}

The Ready::ERROR readiness can easily be added, and maps to the error bit of mio::event::Event. But now there is a mismatch between Ready and Interest: there is no Interest to specifically await error readiness. This is already the case for read/write closed, but in my case I specifically want to await error readiness.

So the above code just always also awaits error readiness, but that means there is no targeted way to wait for just error readiness, and error readiness would be visible to users that just want read/write/priority readiness.

So, I'm not really sure what to do with that. Maybe there is a solution strictly on the tokio side, but it's not obvious to me.

@Thomasdezeeuw
Copy link
Collaborator

I did some more digging. I think tokio-rs/tokio#5566 if/when it lands provides a bunch of useful features. In particular it allows this pattern on file descriptors

let guard = async_fd.ready(Interest::PRIORITY).await.unwrap();
dbg!(guard.ready());

with some hackery, I can indeed get an error event to show

[src/main.rs:90] guard.ready() = Ready {
    is_readable: false,
    is_writable: false,
    is_read_closed: false,
    is_write_closed: false,
    is_priority_ready: false,
    is_error: true,
}

However, there isn't really a good way to await error readiness that I can see.

I think you should wait for read readiness, that should trigger error as well. But I'm not sure how the interaction with two file descriptors (user space) work, because epoll's trigger works on file descriptions (kernel side), their might be some weirdness when dealing with two file descriptors pointing to the same file description, I'm not sure.

In tokio, to await something you need to provide an interest. Today this is defined as

#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Interest(mio::Interest);

this interest is then turned into a ready value, and this ready value is matched agains incoming events, to see if anything matches.

pub(crate) fn from_interest(interest: Interest) -> Ready {
    let mut ready = Ready::EMPTY;

    if interest.is_readable() {
        ready |= Ready::READABLE;
        ready |= Ready::READ_CLOSED;
    }

    if interest.is_writable() {
        ready |= Ready::WRITABLE;
        ready |= Ready::WRITE_CLOSED;
    }

    #[cfg(any(target_os = "linux", target_os = "android"))]
    if interest.is_priority() {
        ready |= Ready::PRIORITY;
        ready |= Ready::READ_CLOSED;
    }

    // NOTE: this is a line I added
    ready |= Ready::ERROR;

    ready
}

The Ready::ERROR readiness can easily be added, and maps to the error bit of mio::event::Event. But now there is a mismatch between Ready and Interest: there is no Interest to specifically await error readiness. This is already the case for read/write closed, but in my case I specifically want to await error readiness.

I'm repeating myself here, but mio::Interest::ERROR doesn't make sense because it's always triggered, i.e. not supplying mio::Interest::ERROR (were it to exist) might confuse users when error event do show up.

You can try mio::Interest::PRIORITY or READABLE, those should also trigger error events.

So the above code just always also awaits error readiness, but that means there is no targeted way to wait for just error readiness, and error readiness would be visible to users that just want read/write/priority readiness.

So, I'm not really sure what to do with that. Maybe there is a solution strictly on the tokio side, but it's not obvious to me.

@folkertdev
Copy link
Author

right, I'll have to figure something out strictly on the tokio side then.

thanks for your help here, and all your work on mio!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants