Skip to content

Commit

Permalink
feat: add fused stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
nappa85 committed Dec 31, 2024
1 parent 260a87f commit dc3ec78
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ use std::{
task::{ready, Context, Poll},
};

use futures_util::{FutureExt, Stream};
use futures_util::{stream::FusedStream, FutureExt, Stream};
use kobject_uevent::UEvent;
use netlink_sys::{
protocols::NETLINK_KOBJECT_UEVENT, AsyncSocket, AsyncSocketExt, SocketAddr, TokioSocket,
};

type FutureOutput = (TokioSocket, Result<Vec<u8>, io::Error>);
/// TODO: replace this with TAIT as soon it's stabilized
type UEventsFuture = Pin<Box<dyn Future<Output = (TokioSocket, Result<Vec<u8>, io::Error>)>>>;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Socket open error: {0}")]
Open(io::Error),
#[error("Socket bind error: {0}")]
Bind(io::Error),
#[error("Socket receive error: {0}")]
Receive(io::Error),
#[error("Socket open error")]
Open(#[source] io::Error),
#[error("Socket bind error")]
Bind(#[source] io::Error),
#[error("Socket receive error")]
Receive(#[source] io::Error),
#[error(transparent)]
NetlinkPacket(kobject_uevent::Error),
}
Expand All @@ -37,12 +38,12 @@ pub fn uevents() -> Result<impl Stream<Item = Result<UEvent, Error>>, Error> {

enum UEventsStream {
Socket(TokioSocket),
Future(Pin<Box<dyn Future<Output = FutureOutput>>>),
Future(UEventsFuture),
None,
}

impl UEventsStream {
pub fn new(socket: TokioSocket) -> Self {
fn new(socket: TokioSocket) -> Self {
Self::Socket(socket)
}

Expand Down Expand Up @@ -77,7 +78,7 @@ impl Stream for UEventsStream {
match res {
Ok(buf) => {
if buf.is_empty() {
return Poll::Ready(None);
*this = Self::None;
} else {
return Poll::Ready(Some(
UEvent::from_netlink_packet(&buf).map_err(Error::NetlinkPacket),
Expand All @@ -90,6 +91,16 @@ impl Stream for UEventsStream {
}
}

Poll::Pending
if matches!(this, Self::None) {
Poll::Ready(None)
} else {
unreachable!();
}
}
}

impl FusedStream for UEventsStream {
fn is_terminated(&self) -> bool {
matches!(self, Self::None)
}
}

0 comments on commit dc3ec78

Please sign in to comment.