diff --git a/src/stream.rs b/src/stream.rs index c27c997..cb265cb 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -6,22 +6,23 @@ use std::{ task::{ready, Context, Poll}, }; -use futures_util::{FutureExt, Stream}; +use futures_util::{stream::FusedStream, FutureExt, Stream}; use kobject_uevent::UEvent; use netlink_sys::{ protocols::NETLINK_KOBJECT_UEVENT, AsyncSocket, AsyncSocketExt, SocketAddr, TokioSocket, }; -type FutureOutput = (TokioSocket, Result, io::Error>); +/// TODO: replace this with TAIT as soon it's stabilized +type UEventsFuture = Pin, io::Error>)>>>; #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("Socket open error: {0}")] - Open(io::Error), - #[error("Socket bind error: {0}")] - Bind(io::Error), - #[error("Socket receive error: {0}")] - Receive(io::Error), + #[error("Socket open error")] + Open(#[source] io::Error), + #[error("Socket bind error")] + Bind(#[source] io::Error), + #[error("Socket receive error")] + Receive(#[source] io::Error), #[error(transparent)] NetlinkPacket(kobject_uevent::Error), } @@ -37,12 +38,12 @@ pub fn uevents() -> Result>, Error> { enum UEventsStream { Socket(TokioSocket), - Future(Pin>>), + Future(UEventsFuture), None, } impl UEventsStream { - pub fn new(socket: TokioSocket) -> Self { + fn new(socket: TokioSocket) -> Self { Self::Socket(socket) } @@ -77,7 +78,7 @@ impl Stream for UEventsStream { match res { Ok(buf) => { if buf.is_empty() { - return Poll::Ready(None); + *this = Self::None; } else { return Poll::Ready(Some( UEvent::from_netlink_packet(&buf).map_err(Error::NetlinkPacket), @@ -90,6 +91,16 @@ impl Stream for UEventsStream { } } - Poll::Pending + if matches!(this, Self::None) { + Poll::Ready(None) + } else { + unreachable!(); + } + } +} + +impl FusedStream for UEventsStream { + fn is_terminated(&self) -> bool { + matches!(self, Self::None) } }