Skip to content

Commit

Permalink
Use Receiver::try_recv instead of Stream::next with timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
phil-opp committed Dec 10, 2024
1 parent 46ff789 commit 36cbb86
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions apis/rust/node/src/event_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod thread;

pub struct EventStream {
node_id: NodeId,
receiver: flume::r#async::RecvStream<'static, EventItem>,
receiver: flume::Receiver<EventItem>,
_thread_handle: EventStreamThreadHandle,
close_channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
Expand Down Expand Up @@ -148,7 +148,7 @@ impl EventStream {

Ok(EventStream {
node_id: node_id.clone(),
receiver: rx.into_stream(),
receiver: rx,
_thread_handle: thread_handle,
close_channel,
clock,
Expand All @@ -169,16 +169,15 @@ impl EventStream {
pub async fn recv_async(&mut self) -> Option<Event> {
loop {
if self.scheduler.is_empty() {
if let Some(event) = self.receiver.next().await {
if let Ok(event) = self.receiver.recv_async().await {
self.scheduler.add_event(event);
} else {
break;
}
} else {
match select(Delay::new(Duration::from_micros(300)), self.receiver.next()).await {
Either::Left((_elapsed, _)) => break,
Either::Right((Some(event), _)) => self.scheduler.add_event(event),
Either::Right((None, _)) => break,
match self.receiver.try_recv() {
Ok(event) => self.scheduler.add_event(event),
Err(_) => break, // no other ready events
};
}
}
Expand All @@ -187,11 +186,11 @@ impl EventStream {
}

pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
let next_event = match select(Delay::new(dur), self.receiver.next()).await {
let next_event = match select(Delay::new(dur), self.receiver.recv_async()).await {
Either::Left((_elapsed, _)) => {
Some(EventItem::TimeoutError(eyre!("Receiver timed out")))
}
Either::Right((event, _)) => event,
Either::Right((event, _)) => event.ok(),
};
next_event.map(Self::convert_event_item)
}
Expand Down Expand Up @@ -257,10 +256,11 @@ impl Stream for EventStream {
type Item = Event;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.receiver
.stream()
.poll_next_unpin(cx)
.map(|item| item.map(Self::convert_event_item))
}
Expand Down

0 comments on commit 36cbb86

Please sign in to comment.