Skip to content

Commit

Permalink
m: Port to event-listener v5.0.0
Browse files Browse the repository at this point in the history
Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull authored Feb 7, 2024
1 parent 706f275 commit efdddaa
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 33 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ exclude = ["/.*"]

[dependencies]
concurrent-queue = { version = "2", default-features = false }
event-listener = { version = "4.0.0", default-features = false }
event-listener-strategy = { version = "0.4.0", default-features = false }
event-listener = { version = "5.0.0", default-features = false }
event-listener-strategy = { version = "0.5.0", default-features = false }
futures-core = { version = "0.3.5", default-features = false }
pin-project-lite = "0.2.11"

Expand Down
88 changes: 57 additions & 31 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern crate alloc;

use core::fmt;
use core::future::Future;
use core::marker::PhantomPinned;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{Context, Poll};
Expand All @@ -52,6 +53,7 @@ use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
use futures_core::ready;
use futures_core::stream::Stream;
use pin_project_lite::pin_project;

struct Channel<T> {
/// Inner message queue.
Expand Down Expand Up @@ -132,8 +134,9 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(),
listener: None,
channel,
_pin: PhantomPinned,
};
(s, r)
}
Expand Down Expand Up @@ -172,8 +175,9 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(),
listener: None,
channel,
_pin: PhantomPinned,
};
(s, r)
}
Expand Down Expand Up @@ -247,7 +251,8 @@ impl<T> Sender<T> {
Send::_new(SendInner {
sender: self,
msg: Some(msg),
listener: EventListener::new(),
listener: None,
_pin: PhantomPinned,
})
}

Expand Down Expand Up @@ -477,7 +482,7 @@ impl<T> Clone for Sender<T> {
}
}

pin_project_lite::pin_project! {
pin_project! {
/// The receiving side of a channel.
///
/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
Expand All @@ -491,8 +496,11 @@ pin_project_lite::pin_project! {
channel: Arc<Channel<T>>,

// Listens for a send or close event to unblock this stream.
listener: Option<EventListener>,

// Keeping this type `!Unpin` enables future optimizations.
#[pin]
listener: EventListener,
_pin: PhantomPinned
}

impl<T> PinnedDrop for Receiver<T> {
Expand Down Expand Up @@ -567,7 +575,8 @@ impl<T> Receiver<T> {
pub fn recv(&self) -> Recv<'_, T> {
Recv::_new(RecvInner {
receiver: self,
listener: EventListener::new(),
listener: None,
_pin: PhantomPinned,
})
}

Expand Down Expand Up @@ -787,7 +796,8 @@ impl<T> Clone for Receiver<T> {

Receiver {
channel: self.channel.clone(),
listener: EventListener::new(),
listener: None,
_pin: PhantomPinned,
}
}
}
Expand All @@ -800,8 +810,9 @@ impl<T> Stream for Receiver<T> {
// If this stream is listening for events, first wait for a notification.
{
let this = self.as_mut().project();
if this.listener.is_listening() {
ready!(this.listener.poll(cx));
if let Some(listener) = this.listener.as_mut() {
ready!(Pin::new(listener).poll(cx));
*this.listener = None;
}
}

Expand All @@ -810,26 +821,26 @@ impl<T> Stream for Receiver<T> {
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
let mut this = self.project();
this.listener.as_mut().set(EventListener::new());
let this = self.as_mut().project();
*this.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
let mut this = self.project();
this.listener.as_mut().set(EventListener::new());
let this = self.as_mut().project();
*this.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
let mut this = self.as_mut().project();
if this.listener.is_listening() {
let this = self.as_mut().project();
if this.listener.is_some() {
// Go back to the outer loop to wait for a notification.
break;
} else {
this.listener.as_mut().listen(&this.channel.stream_ops);
*this.listener = Some(this.channel.stream_ops.listen());
}
}
}
Expand Down Expand Up @@ -914,7 +925,8 @@ impl<T> WeakReceiver<T> {
}
Ok(_) => Some(Receiver {
channel: self.channel.clone(),
listener: EventListener::new(),
listener: None,
_pin: PhantomPinned,
}),
}
}
Expand Down Expand Up @@ -1084,13 +1096,22 @@ easy_wrapper! {
pub(crate) wait();
}

pin_project_lite::pin_project! {
pin_project! {
#[derive(Debug)]
#[project(!Unpin)]
struct SendInner<'a, T> {
// Reference to the original sender.
sender: &'a Sender<T>,

// The message to send.
msg: Option<T>,

// Listener waiting on the channel.
listener: Option<EventListener>,

// Keeping this type `!Unpin` enables future optimizations.
#[pin]
listener: EventListener,
_pin: PhantomPinned
}
}

Expand All @@ -1103,7 +1124,7 @@ impl<'a, T> EventListenerFuture for SendInner<'a, T> {
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Result<(), SendError<T>>> {
let mut this = self.project();
let this = self.project();

loop {
let msg = this.msg.take().unwrap();
Expand All @@ -1115,11 +1136,11 @@ impl<'a, T> EventListenerFuture for SendInner<'a, T> {
}

// Sending failed - now start listening for notifications or wait for one.
if this.listener.is_listening() {
if this.listener.is_some() {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), context));
ready!(S::poll(strategy, &mut *this.listener, context));
} else {
this.listener.as_mut().listen(&this.sender.channel.send_ops);
*this.listener = Some(this.sender.channel.send_ops.listen());
}
}
}
Expand All @@ -1134,12 +1155,19 @@ easy_wrapper! {
pub(crate) wait();
}

pin_project_lite::pin_project! {
pin_project! {
#[derive(Debug)]
#[project(!Unpin)]
struct RecvInner<'a, T> {
// Reference to the receiver.
receiver: &'a Receiver<T>,

// Listener waiting on the channel.
listener: Option<EventListener>,

// Keeping this type `!Unpin` enables future optimizations.
#[pin]
listener: EventListener,
_pin: PhantomPinned
}
}

Expand All @@ -1152,7 +1180,7 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Result<T, RecvError>> {
let mut this = self.project();
let this = self.project();

loop {
// Attempt to receive a message.
Expand All @@ -1163,13 +1191,11 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
}

// Receiving failed - now start listening for notifications or wait for one.
if this.listener.is_listening() {
if this.listener.is_some() {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), cx));
ready!(S::poll(strategy, &mut *this.listener, cx));
} else {
this.listener
.as_mut()
.listen(&this.receiver.channel.recv_ops);
*this.listener = Some(this.receiver.channel.recv_ops.listen());
}
}
}
Expand Down

0 comments on commit efdddaa

Please sign in to comment.